Repository: hive
Updated Branches:
  refs/heads/master ad20ff4b1 -> 28f7d19af


HIVE-19154: Poll notification events to invalidate the results cache (Jason 
Dere, reviewed by GopalV)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/28f7d19a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/28f7d19a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/28f7d19a

Branch: refs/heads/master
Commit: 28f7d19af05fbee2c80102e4c39684e4a5c33697
Parents: ad20ff4
Author: Jason Dere <jd...@hortonworks.com>
Authored: Mon Apr 16 17:07:23 2018 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Mon Apr 16 17:07:23 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  10 +
 .../test/resources/testconfiguration.properties |   1 +
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |   6 +
 .../ql/cache/results/QueryResultsCache.java     | 166 +++++++-
 .../hive/ql/metadata/events/EventConsumer.java  |  28 ++
 .../metadata/events/NotificationEventPoll.java  | 162 ++++++++
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   3 +-
 .../results_cache_invalidation2.q               |  64 ++++
 .../llap/results_cache_invalidation2.q.out      | 380 +++++++++++++++++++
 .../results_cache_invalidation2.q.out           | 373 ++++++++++++++++++
 .../apache/hive/service/server/HiveServer2.java |   7 +
 11 files changed, 1179 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/28f7d19a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4da4e1d..b8d948d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4295,6 +4295,16 @@ public class HiveConf extends Configuration {
         (long) 10 * 1024 * 1024,
         "Maximum size in bytes that a single query result is allowed to use in 
the results cache directory"),
 
+    
HIVE_NOTFICATION_EVENT_POLL_INTERVAL("hive.notification.event.poll.interval", 
"60s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "How often the notification log is polled for new NotificationEvents 
from the metastore." +
+        "A nonpositive value means the notification log is never polled."),
+
+    HIVE_NOTFICATION_EVENT_CONSUMERS("hive.notification.event.consumers",
+        
"org.apache.hadoop.hive.ql.cache.results.QueryResultsCache$InvalidationEventConsumer",
+        "Comma-separated list of class names extending EventConsumer," +
+         "to handle the NotificationEvents retreived by the notification event 
poll."),
+
     /* BLOBSTORE section */
 
     HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", 
"s3,s3a,s3n",

http://git-wip-us.apache.org/repos/asf/hive/blob/28f7d19a/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 2b37053..d26f0cc 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -224,6 +224,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
   results_cache_empty_result.q,\
   results_cache_invalidation.q,\
   results_cache_transactional.q,\
+  results_cache_invalidation2.q,\
   sample1.q,\
   selectDistinctStar.q,\
   select_dummy_source.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/28f7d19a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java 
b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 574b822..9fd87b6 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.events.NotificationEventPoll;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ParseDriver;
@@ -1050,6 +1051,7 @@ public class QTestUtil {
     }
 
     // Remove any cached results from the previous test.
+    NotificationEventPoll.shutdown();
     QueryResultsCache.cleanupInstance();
 
     // allocate and initialize a new conf since a test can
@@ -2236,4 +2238,8 @@ public class QTestUtil {
   public QOutProcessor getQOutProcessor() {
     return qOutProcessor;
   }
+
+  public static void initEventNotificationPoll() throws Exception {
+    NotificationEventPoll.initialize(SessionState.get().getConf());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/28f7d19a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java 
b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
index 90c8ec3..56a9faa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -43,8 +44,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
+import java.util.stream.Stream;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,12 +61,17 @@ import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.Entity.Type;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.events.EventConsumer;
 import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
 import org.apache.hadoop.hive.ql.parse.TableAccessInfo;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
@@ -95,6 +104,7 @@ public final class QueryResultsCache {
   }
 
   public static class QueryInfo {
+    private long queryTime;
     private LookupInfo lookupInfo;
     private HiveOperation hiveOperation;
     private List<FieldSchema> resultSchema;
@@ -103,12 +113,14 @@ public final class QueryResultsCache {
     private Set<ReadEntity> inputs;
 
     public QueryInfo(
+        long queryTime,
         LookupInfo lookupInfo,
         HiveOperation hiveOperation,
         List<FieldSchema> resultSchema,
         TableAccessInfo tableAccessInfo,
         ColumnAccessInfo columnAccessInfo,
         Set<ReadEntity> inputs) {
+      this.queryTime = queryTime;
       this.lookupInfo = lookupInfo;
       this.hiveOperation = hiveOperation;
       this.resultSchema = resultSchema;
@@ -164,6 +176,14 @@ public final class QueryResultsCache {
     public void setInputs(Set<ReadEntity> inputs) {
       this.inputs = inputs;
     }
+
+    public long getQueryTime() {
+      return queryTime;
+    }
+
+    public void setQueryTime(long queryTime) {
+      this.queryTime = queryTime;
+    }
   }
 
   public enum CacheEntryStatus {
@@ -176,7 +196,6 @@ public final class QueryResultsCache {
     private Path cachedResultsPath;
 
     // Cache administration
-    private long createTime;
     private long size;
     private AtomicInteger readers = new AtomicInteger(0);
     private ScheduledFuture<?> invalidationFuture = null;
@@ -311,6 +330,12 @@ public final class QueryResultsCache {
         }
       }
     }
+
+    public Stream<String> getTableNames() {
+      return queryInfo.getInputs().stream()
+          .filter(readEntity -> readEntity.getType() == Type.TABLE)
+          .map(readEntity -> readEntity.getTable().getFullyQualifiedName());
+    }
   }
 
   // Allow lookup by query string
@@ -321,6 +346,9 @@ public final class QueryResultsCache {
   private final Map<CacheEntry, CacheEntry> lru = Collections.synchronizedMap(
       new LinkedHashMap<CacheEntry, CacheEntry>(INITIAL_LRU_SIZE, 
LRU_LOAD_FACTOR, true));
 
+  // Lookup of cache entries by table used in the query, for cache 
invalidation.
+  private final Map<String, Set<CacheEntry>> tableToEntryMap = new HashMap<>();
+
   private final HiveConf conf;
   private Path cacheDirPath;
   private Path zeroRowsPath;
@@ -329,6 +357,7 @@ public final class QueryResultsCache {
   private long maxEntrySize;
   private long maxEntryLifetime;
   private ReadWriteLock rwLock = new ReentrantReadWriteLock();
+  private ScheduledFuture<?> invalidationPollFuture;
 
   private QueryResultsCache(HiveConf configuration) throws IOException {
     this.conf = configuration;
@@ -472,13 +501,11 @@ public final class QueryResultsCache {
       LOG.info("Adding placeholder cache entry for query '{}'", queryText);
 
       // Add the entry to the cache structures while under write lock.
-      Set<CacheEntry> entriesForQuery = queryMap.get(queryText);
-      if (entriesForQuery == null) {
-        entriesForQuery = new HashSet<CacheEntry>();
-        queryMap.put(queryText, entriesForQuery);
-      }
-      entriesForQuery.add(addedEntry);
+      addToEntryMap(queryMap, queryText, addedEntry);
       lru.put(addedEntry, addedEntry);
+      // Index of entries by table usage.
+      addedEntry.getTableNames()
+          .forEach(tableName -> addToEntryMap(tableToEntryMap, tableName, 
addedEntry));
     } finally {
       writeLock.unlock();
     }
@@ -544,7 +571,6 @@ public final class QueryResultsCache {
         cacheEntry.cachedResultsPath = cachedResultsPath;
         cacheEntry.size = resultSize;
         this.cacheSize += resultSize;
-        cacheEntry.createTime = System.currentTimeMillis();
         cacheEntry.txnWriteIdList = txnWriteIdList;
 
         cacheEntry.setStatus(CacheEntryStatus.VALID);
@@ -616,6 +642,32 @@ public final class QueryResultsCache {
     }
   }
 
+  public void notifyTableChanged(String dbName, String tableName, long 
updateTime) {
+    LOG.debug("Table changed: {}.{}, at {}", dbName, tableName, updateTime);
+    // Invalidate all cache entries using this table.
+    List<CacheEntry> entriesToInvalidate = null;
+    rwLock.writeLock().lock();
+    try {
+      String key = (dbName.toLowerCase() + "." + tableName.toLowerCase());
+      Set<CacheEntry> entriesForTable = tableToEntryMap.get(key);
+      if (entriesForTable != null) {
+        // Possible concurrent modification issues if we try to remove cache 
entries while
+        // traversing the cache structures. Save the entries to remove in a 
separate list.
+        entriesToInvalidate = new ArrayList<>(entriesForTable);
+      }
+      if (entriesToInvalidate != null) {
+        for (CacheEntry entry : entriesToInvalidate) {
+          // Ignore updates that occured before this cached query was created.
+          if (entry.getQueryInfo().getQueryTime() <= updateTime) {
+            removeEntry(entry);
+          }
+        }
+      }
+    } finally {
+      rwLock.writeLock().unlock();
+    }
+  }
+
   private static final int INITIAL_LRU_SIZE = 16;
   private static final float LRU_LOAD_FACTOR = 0.75f;
   private static final CacheEntry[] EMPTY_CACHEENTRY_ARRAY = {};
@@ -690,18 +742,13 @@ public final class QueryResultsCache {
 
   private void removeFromLookup(CacheEntry entry) {
     String queryString = entry.getQueryText();
-    Set<CacheEntry> entries = queryMap.get(queryString);
-    if (entries == null) {
-      LOG.warn("ResultsCache: no entry for {}", queryString);
-      return;
-    }
-    boolean deleted = entries.remove(entry);
-    if (!deleted) {
-      LOG.warn("ResultsCache: Attempted to remove entry but it was not in the 
cache: {}", entry);
-    }
-    if (entries.isEmpty()) {
-      queryMap.remove(queryString);
+    if (!removeFromEntryMap(queryMap, queryString, entry)) {
+      LOG.warn("Attempted to remove entry but it was not in the cache: {}", 
entry);
     }
+
+    // Remove this entry from the table usage mappings.
+    entry.getTableNames()
+        .forEach(tableName -> removeFromEntryMap(tableToEntryMap, tableName, 
entry));
   }
 
   private void calculateEntrySize(CacheEntry entry, FetchWork fetchWork) 
throws IOException {
@@ -783,13 +830,42 @@ public final class QueryResultsCache {
     return false;
   }
 
+  private static void addToEntryMap(Map<String, Set<CacheEntry>> entryMap,
+      String key, CacheEntry entry) {
+    Set<CacheEntry> entriesForKey = entryMap.get(key);
+    if (entriesForKey == null) {
+      entriesForKey = new HashSet<CacheEntry>();
+      entryMap.put(key, entriesForKey);
+    }
+    entriesForKey.add(entry);
+  }
+
+  private static boolean removeFromEntryMap(Map<String, Set<CacheEntry>> 
entryMap,
+      String key, CacheEntry entry) {
+    Set<CacheEntry> entries = entryMap.get(key);
+    if (entries == null) {
+      return false;
+    }
+    boolean deleted = entries.remove(entry);
+    if (!deleted) {
+      return false;
+    }
+    if (entries.isEmpty()) {
+      entryMap.remove(key);
+    }
+    return true;
+  }
 
   @VisibleForTesting
   public static void cleanupInstance() {
     // This should only ever be called in testing scenarios.
     // There should not be any other users of the cache or its entries or this 
may mess up cleanup.
     if (inited.get()) {
-      getInstance().clear();
+      if (instance.invalidationPollFuture != null) {
+        instance.invalidationPollFuture.cancel(true);
+        instance.invalidationPollFuture = null;
+      }
+      instance.clear();
       instance = null;
       inited.set(false);
     }
@@ -880,4 +956,54 @@ public final class QueryResultsCache {
     metrics.addGauge(MetricsConstant.QC_MAX_SIZE, maxCacheSize);
     metrics.addGauge(MetricsConstant.QC_CURRENT_SIZE, curCacheSize);
   }
+
+  // EventConsumer to invalidate cache entries based on metastore notification 
events (alter table, add partition, etc).
+  public static class InvalidationEventConsumer implements EventConsumer {
+    Configuration conf;
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
+
+    @Override
+    public void accept(NotificationEvent event) {
+      String dbName;
+      String tableName;
+
+      switch (event.getEventType()) {
+      case MessageFactory.ADD_PARTITION_EVENT:
+      case MessageFactory.ALTER_PARTITION_EVENT:
+      case MessageFactory.DROP_PARTITION_EVENT:
+      case MessageFactory.ALTER_TABLE_EVENT:
+      case MessageFactory.DROP_TABLE_EVENT:
+      case MessageFactory.INSERT_EVENT:
+        dbName = event.getDbName();
+        tableName = event.getTableName();
+        break;
+      default:
+        return;
+      }
+
+      if (dbName == null || tableName == null) {
+        LOG.info("Possibly malformed notification event, missing db or table 
name: {}", event);
+        return;
+      }
+
+      LOG.debug("Handling event {} on table {}.{}", event.getEventType(), 
dbName, tableName);
+
+      QueryResultsCache cache = QueryResultsCache.getInstance();
+      if (cache != null) {
+        long eventTime = event.getEventTime() * 1000L;
+        cache.notifyTableChanged(dbName, tableName, eventTime);
+      } else {
+        LOG.debug("Cache not instantiated, skipping event on {}.{}", dbName, 
tableName);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/28f7d19a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventConsumer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventConsumer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventConsumer.java
new file mode 100644
index 0000000..32e3308
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventConsumer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.metadata.events;
+
+import java.util.function.Consumer;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+public interface EventConsumer extends Configurable, 
Consumer<NotificationEvent> {
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/28f7d19a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java
new file mode 100644
index 0000000..c35ca44
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.metadata.events;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NotificationEventPoll {
+  private static final Logger LOG = 
LoggerFactory.getLogger(NotificationEventPoll.class);
+  private static final AtomicBoolean inited = new AtomicBoolean(false);
+  private static NotificationEventPoll instance;
+
+  Configuration conf;
+  ScheduledExecutorService executorService;
+  List<EventConsumer> eventConsumers = new ArrayList<>();
+  ScheduledFuture<?> pollFuture;
+  long lastCheckedEventId;
+
+  public static void initialize(Configuration conf) throws Exception {
+    if (!inited.getAndSet(true)) {
+      try {
+        instance = new NotificationEventPoll(conf);
+      } catch (Exception err) {
+        inited.set(false);
+        throw err;
+      }
+    }
+  }
+
+  public static void shutdown() {
+    // Should only be called for testing.
+    if (inited.get()) {
+      instance.stop();
+      instance = null;
+      inited.set(false);
+    }
+  }
+
+  private NotificationEventPoll(Configuration conf) throws Exception {
+    this.conf = conf;
+
+    long pollInterval = HiveConf.getTimeVar(conf,
+        HiveConf.ConfVars.HIVE_NOTFICATION_EVENT_POLL_INTERVAL, 
TimeUnit.MILLISECONDS);
+    if (pollInterval <= 0) {
+      LOG.debug("Non-positive poll interval configured, notification event 
polling disabled");
+      return;
+    }
+
+    // Initialize the list of event handlers
+    String[] consumerClassNames =
+        
conf.getStrings(HiveConf.ConfVars.HIVE_NOTFICATION_EVENT_CONSUMERS.varname);
+    if (consumerClassNames != null && consumerClassNames.length > 0) {
+      for (String consumerClassName : consumerClassNames) {
+        Class<?> consumerClass = JavaUtils.loadClass(consumerClassName);
+        EventConsumer consumer =
+            (EventConsumer) ReflectionUtils.newInstance(consumerClass, conf);
+        eventConsumers.add(consumer);
+      }
+    } else {
+      LOG.debug("No event consumers configured, notification event polling 
disabled");
+      return;
+    }
+
+    EventUtils.MSClientNotificationFetcher evFetcher
+        = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC());
+    lastCheckedEventId = evFetcher.getCurrentNotificationEventId();
+    LOG.info("Initializing lastCheckedEventId to {}", lastCheckedEventId);
+
+    // Start the scheduled poll task
+    ThreadFactory threadFactory =
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("NotificationEventPoll %d")
+            .build();
+    executorService = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
+    pollFuture = executorService.scheduleAtFixedRate(new Poller(),
+        pollInterval, pollInterval, TimeUnit.MILLISECONDS);
+  }
+
+  private void stop() {
+    if (pollFuture != null) {
+      pollFuture.cancel(true);
+      pollFuture = null;
+    }
+    if (executorService != null) {
+      executorService.shutdown();
+      executorService = null;
+    }
+  }
+
+  class Poller implements Runnable {
+    @Override
+    public void run() {
+      LOG.debug("Polling for notification events");
+
+      int eventsProcessed = 0;
+      try {
+        // Get any new notification events that have been since the last time 
we checked,
+        // And pass them on to the event handlers.
+        EventUtils.MSClientNotificationFetcher evFetcher
+            = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC());
+        EventUtils.NotificationEventIterator evIter =
+            new EventUtils.NotificationEventIterator(evFetcher, 
lastCheckedEventId, 0, "*", null);
+
+        while (evIter.hasNext()) {
+          NotificationEvent event = evIter.next();
+          LOG.debug("Event: " + event);
+          for (EventConsumer eventConsumer : eventConsumers) {
+            try {
+              eventConsumer.accept(event);
+            } catch (Exception err) {
+              LOG.error("Error processing notification event " + event, err);
+            }
+          }
+          eventsProcessed++;
+          lastCheckedEventId = event.getEventId();
+        }
+      } catch (Exception err) {
+        LOG.error("Error polling for notification events", err);
+      }
+
+      LOG.debug("Processed {} notification events", eventsProcessed);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/28f7d19a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index ebf89eb..88b5ed8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -14582,7 +14582,8 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
   }
 
   private QueryResultsCache.QueryInfo 
createCacheQueryInfoForQuery(QueryResultsCache.LookupInfo lookupInfo) {
-    return new QueryResultsCache.QueryInfo(lookupInfo, 
queryState.getHiveOperation(),
+    long queryTime = SessionState.get().getQueryCurrentTimestamp().getTime();
+    return new QueryResultsCache.QueryInfo(queryTime, lookupInfo, 
queryState.getHiveOperation(),
         resultSchema, getTableAccessInfo(), getColumnAccessInfo(), inputs);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/28f7d19a/ql/src/test/queries/clientpositive/results_cache_invalidation2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/results_cache_invalidation2.q 
b/ql/src/test/queries/clientpositive/results_cache_invalidation2.q
new file mode 100644
index 0000000..ceee78f
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/results_cache_invalidation2.q
@@ -0,0 +1,64 @@
+--! qt:dataset:src
+set 
hive.metastore.event.listeners=org.apache.hive.hcatalog.listener.DbNotificationListener;
+
+set hive.query.results.cache.enabled=true;
+-- Enable this setting when HIVE-18609 is in
+--set hive.query.results.cache.nontransactional.tables.enabled=true;
+
+set hive.fetch.task.conversion=more;
+-- Start polling the notification events
+set hive.notification.event.poll.interval=2s;
+select reflect('org.apache.hadoop.hive.ql.QTestUtil', 
'initEventNotificationPoll');
+
+create table tab1 stored as textfile as select * from src;
+insert into tab1 select * from src;
+
+create table tab2 (key string, value string) stored as textfile;
+insert into tab2 select * from src;
+
+-- Run queries which should be saved by the cache.
+select count(*) from tab1 where key > 0;
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key);
+select count(*) from tab2 where key > 0;
+
+set test.comment="Cached entry should be used";
+set test.comment;
+explain
+select count(*) from tab1 where key > 0;
+select count(*) from tab1 where key > 0;
+
+set test.comment="Cached entry should be used";
+set test.comment;
+explain
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key);
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key);
+
+set test.comment="Cached entry should be used";
+set test.comment;
+explain
+select count(*) from tab2 where key > 0;
+select count(*) from tab2 where key > 0;
+
+-- Update tab1
+insert into tab1 select * from src;
+
+-- Run a query long enough that the invalidation check can run.
+select reflect("java.lang.Thread", 'sleep', cast(4000 as bigint));
+
+set test.comment="Cached entry should be invalidated - query should not use 
cache";
+set test.comment;
+explain
+select count(*) from tab1 where key > 0;
+select count(*) from tab1 where key > 0;
+
+set test.comment="Cached entry should be invalidated - query should not use 
cache";
+set test.comment;
+explain
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key);
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key);
+
+set test.comment="tab2 was not modified, this query should still use cache";
+set test.comment;
+explain
+select count(*) from tab2 where key > 0;
+select count(*) from tab2 where key > 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/28f7d19a/ql/src/test/results/clientpositive/llap/results_cache_invalidation2.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/llap/results_cache_invalidation2.q.out 
b/ql/src/test/results/clientpositive/llap/results_cache_invalidation2.q.out
new file mode 100644
index 0000000..5e19cd2
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/results_cache_invalidation2.q.out
@@ -0,0 +1,380 @@
+PREHOOK: query: select reflect('org.apache.hadoop.hive.ql.QTestUtil', 
'initEventNotificationPoll')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: select reflect('org.apache.hadoop.hive.ql.QTestUtil', 
'initEventNotificationPoll')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+null
+PREHOOK: query: create table tab1 stored as textfile as select * from src
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tab1
+POSTHOOK: query: create table tab1 stored as textfile as select * from src
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tab1
+POSTHOOK: Lineage: tab1.key SIMPLE [(src)src.FieldSchema(name:key, 
type:string, comment:default), ]
+POSTHOOK: Lineage: tab1.value SIMPLE [(src)src.FieldSchema(name:value, 
type:string, comment:default), ]
+PREHOOK: query: insert into tab1 select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@tab1
+POSTHOOK: query: insert into tab1 select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@tab1
+POSTHOOK: Lineage: tab1.key SIMPLE [(src)src.FieldSchema(name:key, 
type:string, comment:default), ]
+POSTHOOK: Lineage: tab1.value SIMPLE [(src)src.FieldSchema(name:value, 
type:string, comment:default), ]
+PREHOOK: query: create table tab2 (key string, value string) stored as textfile
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tab2
+POSTHOOK: query: create table tab2 (key string, value string) stored as 
textfile
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tab2
+PREHOOK: query: insert into tab2 select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@tab2
+POSTHOOK: query: insert into tab2 select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@tab2
+POSTHOOK: Lineage: tab2.key SIMPLE [(src)src.FieldSchema(name:key, 
type:string, comment:default), ]
+POSTHOOK: Lineage: tab2.value SIMPLE [(src)src.FieldSchema(name:value, 
type:string, comment:default), ]
+PREHOOK: query: select count(*) from tab1 where key > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab1 where key > 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab1
+#### A masked pattern was here ####
+994
+PREHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab1
+PREHOOK: Input: default@tab2
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab1
+POSTHOOK: Input: default@tab2
+#### A masked pattern was here ####
+2056
+PREHOOK: query: select count(*) from tab2 where key > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab2
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab2 where key > 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab2
+#### A masked pattern was here ####
+497
+test.comment="Cached entry should be used"
+PREHOOK: query: explain
+select count(*) from tab1 where key > 0
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from tab1 where key > 0
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+      Cached Query Result: true
+
+PREHOOK: query: select count(*) from tab1 where key > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab1
+POSTHOOK: query: select count(*) from tab1 where key > 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab1
+994
+test.comment="Cached entry should be used"
+PREHOOK: query: explain
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+      Cached Query Result: true
+
+PREHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab1
+PREHOOK: Input: default@tab2
+POSTHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab1
+POSTHOOK: Input: default@tab2
+2056
+test.comment="Cached entry should be used"
+PREHOOK: query: explain
+select count(*) from tab2 where key > 0
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from tab2 where key > 0
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+      Cached Query Result: true
+
+PREHOOK: query: select count(*) from tab2 where key > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab2
+POSTHOOK: query: select count(*) from tab2 where key > 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab2
+497
+PREHOOK: query: insert into tab1 select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@tab1
+POSTHOOK: query: insert into tab1 select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@tab1
+POSTHOOK: Lineage: tab1.key SIMPLE [(src)src.FieldSchema(name:key, 
type:string, comment:default), ]
+POSTHOOK: Lineage: tab1.value SIMPLE [(src)src.FieldSchema(name:value, 
type:string, comment:default), ]
+PREHOOK: query: select reflect("java.lang.Thread", 'sleep', cast(4000 as 
bigint))
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: select reflect("java.lang.Thread", 'sleep', cast(4000 as 
bigint))
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+null
+test.comment="Cached entry should be invalidated - query should not use cache"
+PREHOOK: query: explain
+select count(*) from tab1 where key > 0
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from tab1 where key > 0
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: tab1
+                  Statistics: Num rows: 1500 Data size: 262384 Basic stats: 
COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (UDFToDouble(key) > 0.0D) (type: boolean)
+                    Statistics: Num rows: 500 Data size: 87461 Basic stats: 
COMPLETE Column stats: NONE
+                    Select Operator
+                      Statistics: Num rows: 500 Data size: 87461 Basic stats: 
COMPLETE Column stats: NONE
+                      Group By Operator
+                        aggregations: count()
+                        mode: hash
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 192 Basic stats: 
COMPLETE Column stats: NONE
+                        Reduce Output Operator
+                          sort order: 
+                          Statistics: Num rows: 1 Data size: 192 Basic stats: 
COMPLETE Column stats: NONE
+                          value expressions: _col0 (type: bigint)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE 
Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE 
Column stats: NONE
+                  table:
+                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from tab1 where key > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab1 where key > 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab1
+#### A masked pattern was here ####
+1491
+test.comment="Cached entry should be invalidated - query should not use cache"
+PREHOOK: query: explain
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: tab1
+                  Statistics: Num rows: 1500 Data size: 262384 Basic stats: 
COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 1425 Data size: 249264 Basic stats: 
COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1425 Data size: 249264 Basic 
stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 1425 Data size: 249264 Basic 
stats: COMPLETE Column stats: NONE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: tab2
+                  Statistics: Num rows: 500 Data size: 43500 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 43500 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 43500 Basic 
stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string)
+                  1 _col0 (type: string)
+                Statistics: Num rows: 1567 Data size: 274190 Basic stats: 
COMPLETE Column stats: NONE
+                Group By Operator
+                  aggregations: count()
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+                  Reduce Output Operator
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+                    value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+                  table:
+                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab1
+PREHOOK: Input: default@tab2
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab1
+POSTHOOK: Input: default@tab2
+#### A masked pattern was here ####
+3084
+test.comment="tab2 was not modified, this query should still use cache"
+PREHOOK: query: explain
+select count(*) from tab2 where key > 0
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from tab2 where key > 0
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+      Cached Query Result: true
+
+PREHOOK: query: select count(*) from tab2 where key > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab2
+POSTHOOK: query: select count(*) from tab2 where key > 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab2
+497

http://git-wip-us.apache.org/repos/asf/hive/blob/28f7d19a/ql/src/test/results/clientpositive/results_cache_invalidation2.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/results_cache_invalidation2.q.out 
b/ql/src/test/results/clientpositive/results_cache_invalidation2.q.out
new file mode 100644
index 0000000..50c8adf
--- /dev/null
+++ b/ql/src/test/results/clientpositive/results_cache_invalidation2.q.out
@@ -0,0 +1,373 @@
+PREHOOK: query: select reflect('org.apache.hadoop.hive.ql.QTestUtil', 
'initEventNotificationPoll')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: select reflect('org.apache.hadoop.hive.ql.QTestUtil', 
'initEventNotificationPoll')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+null
+PREHOOK: query: create table tab1 stored as textfile as select * from src
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tab1
+POSTHOOK: query: create table tab1 stored as textfile as select * from src
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tab1
+POSTHOOK: Lineage: tab1.key SIMPLE [(src)src.FieldSchema(name:key, 
type:string, comment:default), ]
+POSTHOOK: Lineage: tab1.value SIMPLE [(src)src.FieldSchema(name:value, 
type:string, comment:default), ]
+PREHOOK: query: insert into tab1 select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@tab1
+POSTHOOK: query: insert into tab1 select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@tab1
+POSTHOOK: Lineage: tab1.key SIMPLE [(src)src.FieldSchema(name:key, 
type:string, comment:default), ]
+POSTHOOK: Lineage: tab1.value SIMPLE [(src)src.FieldSchema(name:value, 
type:string, comment:default), ]
+PREHOOK: query: create table tab2 (key string, value string) stored as textfile
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tab2
+POSTHOOK: query: create table tab2 (key string, value string) stored as 
textfile
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tab2
+PREHOOK: query: insert into tab2 select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@tab2
+POSTHOOK: query: insert into tab2 select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@tab2
+POSTHOOK: Lineage: tab2.key SIMPLE [(src)src.FieldSchema(name:key, 
type:string, comment:default), ]
+POSTHOOK: Lineage: tab2.value SIMPLE [(src)src.FieldSchema(name:value, 
type:string, comment:default), ]
+PREHOOK: query: select count(*) from tab1 where key > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab1 where key > 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab1
+#### A masked pattern was here ####
+994
+PREHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab1
+PREHOOK: Input: default@tab2
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab1
+POSTHOOK: Input: default@tab2
+#### A masked pattern was here ####
+2056
+PREHOOK: query: select count(*) from tab2 where key > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab2
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab2 where key > 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab2
+#### A masked pattern was here ####
+497
+test.comment="Cached entry should be used"
+PREHOOK: query: explain
+select count(*) from tab1 where key > 0
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from tab1 where key > 0
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+      Cached Query Result: true
+
+PREHOOK: query: select count(*) from tab1 where key > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab1 where key > 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab1
+#### A masked pattern was here ####
+994
+test.comment="Cached entry should be used"
+PREHOOK: query: explain
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+      Cached Query Result: true
+
+PREHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab1
+PREHOOK: Input: default@tab2
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab1
+POSTHOOK: Input: default@tab2
+#### A masked pattern was here ####
+2056
+test.comment="Cached entry should be used"
+PREHOOK: query: explain
+select count(*) from tab2 where key > 0
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from tab2 where key > 0
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+      Cached Query Result: true
+
+PREHOOK: query: select count(*) from tab2 where key > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab2
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab2 where key > 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab2
+#### A masked pattern was here ####
+497
+PREHOOK: query: insert into tab1 select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@tab1
+POSTHOOK: query: insert into tab1 select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@tab1
+POSTHOOK: Lineage: tab1.key SIMPLE [(src)src.FieldSchema(name:key, 
type:string, comment:default), ]
+POSTHOOK: Lineage: tab1.value SIMPLE [(src)src.FieldSchema(name:value, 
type:string, comment:default), ]
+PREHOOK: query: select reflect("java.lang.Thread", 'sleep', cast(4000 as 
bigint))
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+POSTHOOK: query: select reflect("java.lang.Thread", 'sleep', cast(4000 as 
bigint))
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+#### A masked pattern was here ####
+null
+test.comment="Cached entry should be invalidated - query should not use cache"
+PREHOOK: query: explain
+select count(*) from tab1 where key > 0
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from tab1 where key > 0
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: tab1
+            Statistics: Num rows: 1500 Data size: 15936 Basic stats: COMPLETE 
Column stats: NONE
+            Filter Operator
+              predicate: (UDFToDouble(key) > 0.0D) (type: boolean)
+              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
+              Select Operator
+                Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                Group By Operator
+                  aggregations: count()
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+                  Reduce Output Operator
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+                    value expressions: _col0 (type: bigint)
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations: count(VALUE._col0)
+          mode: mergepartial
+          outputColumnNames: _col0
+          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column 
stats: NONE
+          File Output Operator
+            compressed: false
+            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column 
stats: NONE
+            table:
+                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from tab1 where key > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab1 where key > 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab1
+#### A masked pattern was here ####
+1491
+test.comment="Cached entry should be invalidated - query should not use cache"
+PREHOOK: query: explain
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: tab1
+            Statistics: Num rows: 1500 Data size: 15936 Basic stats: COMPLETE 
Column stats: NONE
+            Filter Operator
+              predicate: key is not null (type: boolean)
+              Statistics: Num rows: 1500 Data size: 15936 Basic stats: 
COMPLETE Column stats: NONE
+              Select Operator
+                expressions: key (type: string)
+                outputColumnNames: _col0
+                Statistics: Num rows: 1500 Data size: 15936 Basic stats: 
COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 1500 Data size: 15936 Basic stats: 
COMPLETE Column stats: NONE
+          TableScan
+            alias: tab2
+            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
+            Filter Operator
+              predicate: key is not null (type: boolean)
+              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
+              Select Operator
+                expressions: key (type: string)
+                outputColumnNames: _col0
+                Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+      Reduce Operator Tree:
+        Join Operator
+          condition map:
+               Inner Join 0 to 1
+          keys:
+            0 _col0 (type: string)
+            1 _col0 (type: string)
+          Statistics: Num rows: 1650 Data size: 17529 Basic stats: COMPLETE 
Column stats: NONE
+          Group By Operator
+            aggregations: count()
+            mode: hash
+            outputColumnNames: _col0
+            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column 
stats: NONE
+            File Output Operator
+              compressed: false
+              table:
+                  input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                  serde: 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-2
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            Reduce Output Operator
+              sort order: 
+              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+              value expressions: _col0 (type: bigint)
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations: count(VALUE._col0)
+          mode: mergepartial
+          outputColumnNames: _col0
+          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column 
stats: NONE
+          File Output Operator
+            compressed: false
+            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column 
stats: NONE
+            table:
+                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab1
+PREHOOK: Input: default@tab2
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab1
+POSTHOOK: Input: default@tab2
+#### A masked pattern was here ####
+3084
+test.comment="tab2 was not modified, this query should still use cache"
+PREHOOK: query: explain
+select count(*) from tab2 where key > 0
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from tab2 where key > 0
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+      Cached Query Result: true
+
+PREHOOK: query: select count(*) from tab2 where key > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab2
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from tab2 where key > 0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab2
+#### A masked pattern was here ####
+497

http://git-wip-us.apache.org/repos/asf/hive/blob/28f7d19a/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java 
b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 4e88565..1642357 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry;
+import org.apache.hadoop.hive.ql.metadata.events.NotificationEventPoll;
 import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
@@ -255,6 +256,12 @@ public class HiveServer2 extends CompositeService {
       }
     }
 
+    try {
+      NotificationEventPoll.initialize(hiveConf);
+    } catch (Exception err) {
+      throw new RuntimeException("Error initializing notification event poll", 
err);
+    }
+
     wmQueue = 
hiveConf.get(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname, "").trim();
 
     this.serviceDiscovery = 
hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY);

Reply via email to