Author: khorgath
Date: Wed Jan  7 20:15:10 2015
New Revision: 1650145

URL: http://svn.apache.org/r1650145
Log:
HIVE-9174 : Enable queuing of HCatalog notification events in metastore DB 
(Alan Gates via Sushanth Sowmyan)

Added:
    
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
    
hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/
    
hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
    
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CurrentNotificationEventId.java
    
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEvent.java
    
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventRequest.java
    
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventResponse.java
    
hive/trunk/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationLog.java
    
hive/trunk/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationNextId.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
    hive/trunk/itests/hcatalog-unit/pom.xml
    hive/trunk/metastore/if/hive_metastore.thrift
    hive/trunk/metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore.cpp
    hive/trunk/metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore.h
    
hive/trunk/metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore_server.skeleton.cpp
    hive/trunk/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
    hive/trunk/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
    
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java
    
hive/trunk/metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php
    hive/trunk/metastore/src/gen/thrift/gen-php/metastore/Types.php
    
hive/trunk/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote
    
hive/trunk/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py
    hive/trunk/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
    hive/trunk/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
    hive/trunk/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
    hive/trunk/metastore/src/model/package.jdo
    
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
    
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1650145&r1=1650144&r2=1650145&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed 
Jan  7 20:15:10 2015
@@ -477,6 +477,9 @@ public class HiveConf extends Configurat
     METASTORE_PRE_EVENT_LISTENERS("hive.metastore.pre.event.listeners", "",
         "List of comma separated listeners for metastore events."),
     METASTORE_EVENT_LISTENERS("hive.metastore.event.listeners", "", ""),
+    
METASTORE_EVENT_DB_LISTENER_TTL("hive.metastore.event.db.listener.timetolive", 
"86400s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "time after which events will be removed from the database listener 
queue"),
     
METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS("hive.metastore.authorization.storage.checks",
 false,
         "Should the metastore do authorization checks against the underlying 
storage (usually hdfs) \n" +
         "for operations like drop-partition (disallow the drop-partition if 
the user in\n" +

Added: 
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java?rev=1650145&view=auto
==============================================================================
--- 
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
 (added)
+++ 
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
 Wed Jan  7 20:15:10 2015
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.listener;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.RawStoreProxy;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.messaging.MessageFactory;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An implementation of {@link 
org.apache.hadoop.hive.metastore.MetaStoreEventListener} that
+ * stores events in the database.
+ *
+ * Design overview:  This listener takes any event, builds a 
NotificationEventResponse,
+ * and puts it on a queue.  There is a dedicated thread that reads entries 
from the queue and
+ * places them in the database.  The reason for doing it in a separate thread 
is that we want to
+ * avoid slowing down other metadata operations with the work of putting the 
notification into
+ * the database.  Also, occasionally the thread needs to clean the database of 
old records.  We
+ * definitely don't want to do that as part of another metadata operation.
+ */
+public class DbNotificationListener extends MetaStoreEventListener {
+
+  private static final Log LOG = 
LogFactory.getLog(DbNotificationListener.class.getName());
+  private static CleanerThread cleaner = null;
+
+  // This is the same object as super.conf, but it's convenient to keep a copy 
of it as a
+  // HiveConf rather than a Configuration.
+  private HiveConf hiveConf;
+  private MessageFactory msgFactory;
+  private RawStore rs;
+
+  private synchronized void init(HiveConf conf) {
+    try {
+      rs = RawStoreProxy.getProxy(conf, conf,
+          conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999);
+    } catch (MetaException e) {
+      LOG.error("Unable to connect to raw store, notifications will not be 
tracked", e);
+      rs = null;
+    }
+    if (cleaner == null && rs != null) {
+      cleaner = new CleanerThread(conf, rs);
+      cleaner.start();
+    }
+  }
+
+  public DbNotificationListener(Configuration config) {
+    super(config);
+    // The code in MetastoreUtils.getMetaStoreListeners() that calls this 
looks for a constructor
+    // with a Configuration parameter, so we have to declare config as 
Configuration.  But it
+    // actually passes a HiveConf, which we need.  So we'll do this ugly down 
cast.
+    hiveConf = (HiveConf)config;
+    init(hiveConf);
+    msgFactory = MessageFactory.getInstance();
+  }
+
+  /**
+   * @param tableEvent table event.
+   * @throws org.apache.hadoop.hive.metastore.api.MetaException
+   */
+  public void onConfigChange(ConfigChangeEvent tableEvent) throws 
MetaException {
+    String key = tableEvent.getKey();
+    if 
(key.equals(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.toString())) {
+      // This weirdness of setting it in our hiveConf and then reading back 
does two things.
+      // One, it handles the conversion of the TimeUnit.  Two, it keeps the 
value around for
+      // later in case we need it again.
+      hiveConf.set(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.name(),
+          tableEvent.getNewValue());
+      
cleaner.setTimeToLive(hiveConf.getTimeVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL,
+          TimeUnit.SECONDS));
+    }
+  }
+
+  /**
+   * @param tableEvent table event.
+   * @throws MetaException
+   */
+  public void onCreateTable (CreateTableEvent tableEvent) throws MetaException 
{
+    Table t = tableEvent.getTable();
+    NotificationEvent event = new NotificationEvent(0, now(),
+        HCatConstants.HCAT_CREATE_TABLE_EVENT, 
msgFactory.buildCreateTableMessage(t).toString());
+    event.setDbName(t.getDbName());
+    // Table name is not set in create table because this goes on the queue 
for the database the
+    // table is created in, not the (new) queue for the table itself.
+    enqueue(event);
+  }
+
+  /**
+   * @param tableEvent table event.
+   * @throws MetaException
+   */
+  public void onDropTable (DropTableEvent tableEvent)  throws MetaException {
+    Table t = tableEvent.getTable();
+    NotificationEvent event = new NotificationEvent(0, now(),
+        HCatConstants.HCAT_DROP_TABLE_EVENT, 
msgFactory.buildDropTableMessage(t).toString());
+    event.setDbName(t.getDbName());
+    event.setTableName(t.getTableName());
+    enqueue(event);
+  }
+
+  /**
+   * @param tableEvent alter table event
+   * @throws MetaException
+   */
+  public void onAlterTable (AlterTableEvent tableEvent) throws MetaException {
+    /*Table before = tableEvent.getOldTable();
+    Table after = tableEvent.getNewTable();
+    NotificationEvent event = new NotificationEvent(0, now(),
+        HCatConstants.HCAT_ALTER_TABLE_EVENT,
+        msgFactory.buildAlterTableMessage(before, after).toString());
+    if (event != null) {
+      event.setDbName(after.getDbName());
+      event.setTableName(after.getTableName());
+      enqueue(event);
+    }*/
+    // TODO - once HIVE-9175 is committed
+  }
+
+  /**
+   * @param partitionEvent partition event
+   * @throws MetaException
+   */
+  public void onAddPartition (AddPartitionEvent partitionEvent)
+      throws MetaException {
+    Table t = partitionEvent.getTable();
+    NotificationEvent event = new NotificationEvent(0, now(),
+        HCatConstants.HCAT_ADD_PARTITION_EVENT,
+        msgFactory.buildAddPartitionMessage(t, 
partitionEvent.getPartitions()).toString());
+    event.setDbName(t.getDbName());
+    event.setTableName(t.getTableName());
+    enqueue(event);
+  }
+
+  /**
+   * @param partitionEvent partition event
+   * @throws MetaException
+   */
+  public void onDropPartition (DropPartitionEvent partitionEvent)  throws 
MetaException {
+    Table t = partitionEvent.getTable();
+    NotificationEvent event = new NotificationEvent(0, now(),
+        HCatConstants.HCAT_DROP_PARTITION_EVENT,
+        msgFactory.buildDropPartitionMessage(t, 
partitionEvent.getPartition()).toString());
+    event.setDbName(t.getDbName());
+    event.setTableName(t.getTableName());
+    enqueue(event);
+  }
+
+  /**
+   * @param partitionEvent partition event
+   * @throws MetaException
+   */
+  public void onAlterPartition (AlterPartitionEvent partitionEvent)  throws 
MetaException {
+    // TODO, MessageFactory doesn't support Alter Partition yet.
+  }
+
+  /**
+   * @param dbEvent database event
+   * @throws MetaException
+   */
+  public void onCreateDatabase (CreateDatabaseEvent dbEvent) throws 
MetaException {
+    Database db = dbEvent.getDatabase();
+    NotificationEvent event = new NotificationEvent(0, now(),
+        HCatConstants.HCAT_CREATE_DATABASE_EVENT,
+        msgFactory.buildCreateDatabaseMessage(db).toString());
+    // Database name is null for create database, because this doesn't belong 
to messages for
+    // that database.  Rather it belongs to system wide messages.  The db name 
is in the message,
+    // so listeners can determine it.
+    enqueue(event);
+  }
+
+  /**
+   * @param dbEvent database event
+   * @throws MetaException
+   */
+  public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException {
+    Database db = dbEvent.getDatabase();
+    NotificationEvent event = new NotificationEvent(0, now(),
+        HCatConstants.HCAT_DROP_DATABASE_EVENT,
+        msgFactory.buildDropDatabaseMessage(db).toString());
+    event.setDbName(db.getName());
+    enqueue(event);
+  }
+
+  /**
+   * @param partSetDoneEvent
+   * @throws MetaException
+   */
+  public void onLoadPartitionDone(LoadPartitionDoneEvent partSetDoneEvent) 
throws MetaException {
+    // TODO, we don't support this, but we should, since users may create an 
empty partition and
+    // then load data into it.
+
+  }
+
+  private int now() {
+    long millis = System.currentTimeMillis();
+    millis /= 1000;
+    if (millis > Integer.MAX_VALUE) {
+      LOG.warn("We've passed max int value in seconds since the epoch, " +
+          "all notification times will be the same!");
+      return Integer.MAX_VALUE;
+    }
+    return (int)millis;
+  }
+
+  private void enqueue(NotificationEvent event) {
+    if (rs != null) {
+      rs.addNotificationEvent(event);
+    } else {
+      LOG.warn("Dropping event " + event + " since notification is not 
running.");
+    }
+  }
+
+  private static class CleanerThread extends Thread {
+    private RawStore rs;
+    private int ttl;
+
+
+    CleanerThread(HiveConf conf, RawStore rs) {
+      super("CleanerThread");
+      this.rs = rs;
+      
setTimeToLive(conf.getTimeVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL,
+          TimeUnit.SECONDS));
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        rs.cleanNotificationEvents(ttl);
+        try {
+          Thread.sleep(60000);
+        } catch (InterruptedException e) {
+          LOG.info("Cleaner thread sleep interupted", e);
+        }
+      }
+    }
+
+    public void setTimeToLive(long configTtl) {
+      if (configTtl > Integer.MAX_VALUE) ttl = Integer.MAX_VALUE;
+      else ttl = (int)configTtl;
+    }
+
+  }
+
+}

Modified: 
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java?rev=1650145&r1=1650144&r2=1650145&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
 (original)
+++ 
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
 Wed Jan  7 20:15:10 2015
@@ -19,13 +19,18 @@
 
 package org.apache.hive.hcatalog.messaging.json;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hive.hcatalog.messaging.AddPartitionMessage;
+// TODO, once HIVE-9175 is committed
+// import org.apache.hive.hcatalog.messaging.AlterTableMessage;
 import org.apache.hive.hcatalog.messaging.CreateDatabaseMessage;
 import org.apache.hive.hcatalog.messaging.CreateTableMessage;
 import org.apache.hive.hcatalog.messaging.DropDatabaseMessage;
@@ -42,6 +47,9 @@ import java.util.*;
  */
 public class JSONMessageFactory extends MessageFactory {
 
+  private static final Log LOG = 
LogFactory.getLog(JSONMessageFactory.class.getName());
+
+
   private static JSONMessageDeserializer deserializer = new 
JSONMessageDeserializer();
 
   @Override
@@ -62,31 +70,40 @@ public class JSONMessageFactory extends
   @Override
   public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) {
     return new JSONCreateDatabaseMessage(HCAT_SERVER_URL, 
HCAT_SERVICE_PRINCIPAL, db.getName(),
-        System.currentTimeMillis() / 1000);
+        now());
   }
 
   @Override
   public DropDatabaseMessage buildDropDatabaseMessage(Database db) {
     return new JSONDropDatabaseMessage(HCAT_SERVER_URL, 
HCAT_SERVICE_PRINCIPAL, db.getName(),
-        System.currentTimeMillis() / 1000);
+        now());
   }
 
   @Override
   public CreateTableMessage buildCreateTableMessage(Table table) {
     return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, 
table.getDbName(),
-        table.getTableName(), System.currentTimeMillis()/1000);
+        table.getTableName(), now());
   }
 
+  // TODO Once HIVE-9175 is committed
+  /*
+  @Override
+  public AlterTableMessage buildAlterTableMessage(Table before, Table after) {
+    return new JSONAlterTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, 
before.getDbName(),
+        before.getTableName(), now());
+  }
+  */
+
   @Override
   public DropTableMessage buildDropTableMessage(Table table) {
     return new JSONDropTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, 
table.getDbName(), table.getTableName(),
-        System.currentTimeMillis()/1000);
+        now());
   }
 
   @Override
   public AddPartitionMessage buildAddPartitionMessage(Table table, 
List<Partition> partitions) {
     return new JSONAddPartitionMessage(HCAT_SERVER_URL, 
HCAT_SERVICE_PRINCIPAL, table.getDbName(),
-        table.getTableName(), getPartitionKeyValues(table, partitions), 
System.currentTimeMillis()/1000);
+        table.getTableName(), getPartitionKeyValues(table, partitions), now());
   }
 
   @Override
@@ -100,8 +117,11 @@ public class JSONMessageFactory extends
   @Override
   public DropPartitionMessage buildDropPartitionMessage(Table table, Partition 
partition) {
     return new JSONDropPartitionMessage(HCAT_SERVER_URL, 
HCAT_SERVICE_PRINCIPAL, partition.getDbName(),
-        partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, 
partition)),
-        System.currentTimeMillis()/1000);
+        partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, 
partition)), now());
+  }
+
+  private long now() {
+    return System.currentTimeMillis() / 1000;
   }
 
   private static Map<String, String> getPartitionKeyValues(Table table, 
Partition partition) {

Modified: hive/trunk/itests/hcatalog-unit/pom.xml
URL: 
http://svn.apache.org/viewvc/hive/trunk/itests/hcatalog-unit/pom.xml?rev=1650145&r1=1650144&r2=1650145&view=diff
==============================================================================
--- hive/trunk/itests/hcatalog-unit/pom.xml (original)
+++ hive/trunk/itests/hcatalog-unit/pom.xml Wed Jan  7 20:15:10 2015
@@ -60,6 +60,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-server-extensions</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-hbase-handler</artifactId>
       <version>${project.version}</version>

Added: 
hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java?rev=1650145&view=auto
==============================================================================
--- 
hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
 (added)
+++ 
hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
 Wed Jan  7 20:15:10 2015
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.hive.hcatalog.listener;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.listener.DbNotificationListener;
+import org.apache.hive.hcatalog.messaging.HCatEventMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestDbNotificationListener {
+
+  private static final Log LOG = 
LogFactory.getLog(TestDbNotificationListener.class.getName());
+  private static Map<String, String> emptyParameters = new HashMap<String, 
String>();
+  private static IMetaStoreClient msClient;
+  private int startTime;
+  private long firstEventId;
+
+  @BeforeClass
+  public static void connectToMetastore() throws Exception {
+    HiveConf conf = new HiveConf();
+    conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS,
+        DbNotificationListener.class.getName());
+    msClient = new HiveMetaStoreClient(conf);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    long now = System.currentTimeMillis() / 1000;
+    startTime = 0;
+    if (now > Integer.MAX_VALUE) fail("Bummer, time has fallen over the edge");
+    else startTime = (int) now;
+    firstEventId = msClient.getCurrentNotificationEventId().getEventId();
+  }
+
+  @Test
+  public void createDatabase() throws Exception {
+    Database db = new Database("mydb", "no description", "file:/tmp", 
emptyParameters);
+    msClient.createDatabase(db);
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(0);
+    assertEquals(firstEventId + 1, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, 
event.getEventType());
+    assertNull(event.getDbName());
+    assertNull(event.getTableName());
+    
assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_DATABASE\",\"server\":\"\","
 +
+        "\"servicePrincipal\":\"\",\"db\":\"mydb\",\"timestamp\":[0-9]+}"));
+  }
+
+  @Test
+  public void dropDatabase() throws Exception {
+    Database db = new Database("dropdb", "no description", "file:/tmp", 
emptyParameters);
+    msClient.createDatabase(db);
+    msClient.dropDatabase("dropdb");
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
+    assertEquals(2, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(1);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, event.getEventType());
+    assertEquals("dropdb", event.getDbName());
+    assertNull(event.getTableName());
+    
assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_DATABASE\",\"server\":\"\","
 +
+        "\"servicePrincipal\":\"\",\"db\":\"dropdb\",\"timestamp\":[0-9]+}"));
+  }
+
+  @Test
+  public void createTable() throws Exception {
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(new FieldSchema("col1", "int", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", 
"output", false, 0,
+        serde, null, null, emptyParameters);
+    Table table = new Table("mytable", "default", "me", startTime, startTime, 
0, sd, null,
+        emptyParameters, null, null, null);
+    msClient.createTable(table);
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(0);
+    assertEquals(firstEventId + 1, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
+    assertEquals("default", event.getDbName());
+    assertNull(event.getTableName());
+    
assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\","
 +
+        
"\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"mytable\",\"timestamp\":[0-9]+}"));
+  }
+
+  // TODO After HIVE-9175 is committed
+  /*
+  @Test
+  public void alterTable() throws Exception {
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(new FieldSchema("col1", "int", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", 
"output", false, 0,
+        serde, null, null, emptyParameters);
+    Table table = new Table("alttable", "default", "me", startTime, startTime, 
0, sd, null,
+        emptyParameters, null, null, null);
+    msClient.createTable(table);
+
+    table = new Table("alttable", "default", "me", startTime, startTime + 1, 
0, sd, null,
+        emptyParameters, null, null, null);
+    msClient.alter_table("default", "alttable", table);
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
+    assertEquals(2, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(1);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, event.getEventType());
+    assertEquals("default", event.getDbName());
+    assertEquals("alttable", event.getTableName());
+    
assertTrue(event.getMessage().matches("\\{\"eventType\":\"ALTER_TABLE\",\"server\":\"\","
 +
+        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alttable\"," 
+
+        "\"timestamp\":[0-9]+}"));
+  }
+  */
+
+  @Test
+  public void dropTable() throws Exception {
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(new FieldSchema("col1", "int", "nocomment"));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", 
"output", false, 0,
+        serde, null, null, emptyParameters);
+    Table table = new Table("droptable", "default", "me", startTime, 
startTime, 0, sd, null,
+        emptyParameters, null, null, null);
+    msClient.createTable(table);
+    msClient.dropTable("default", "droptable");
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
+    assertEquals(2, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(1);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, event.getEventType());
+    assertEquals("default", event.getDbName());
+    assertEquals("droptable", event.getTableName());
+    
assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_TABLE\",\"server\":\"\","
 +
+        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
+        "\"droptable\",\"timestamp\":[0-9]+}"));
+  }
+
+  @Test
+  public void addPartition() throws Exception {
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(new FieldSchema("col1", "int", "nocomment"));
+    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", 
"output", false, 0,
+        serde, null, null, emptyParameters);
+    Table table = new Table("addPartTable", "default", "me", startTime, 
startTime, 0, sd, partCols,
+        emptyParameters, null, null, null);
+    msClient.createTable(table);
+
+    Partition partition = new Partition(Arrays.asList("today"), "default", 
"addPartTable",
+        startTime, startTime, sd, emptyParameters);
+    msClient.add_partition(partition);
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
+    assertEquals(2, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(1);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
+    assertEquals("default", event.getDbName());
+    assertEquals("addparttable", event.getTableName());
+    
assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\","
 +
+        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
+        
"\"addparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}"));
+  }
+
+  @Test
+  public void dropPartition() throws Exception {
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(new FieldSchema("col1", "int", "nocomment"));
+    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+    partCols.add(new FieldSchema("ds", "string", ""));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", 
"output", false, 0,
+        serde, null, null, emptyParameters);
+    Table table = new Table("dropPartTable", "default", "me", startTime, 
startTime, 0, sd, partCols,
+        emptyParameters, null, null, null);
+    msClient.createTable(table);
+
+    Partition partition = new Partition(Arrays.asList("today"), "default", 
"dropPartTable",
+        startTime, startTime, sd, emptyParameters);
+    msClient.add_partition(partition);
+
+    msClient.dropPartition("default", "dropparttable", Arrays.asList("today"), 
false);
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
+    assertEquals(3, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(2);
+    assertEquals(firstEventId + 3, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, 
event.getEventType());
+    assertEquals("default", event.getDbName());
+    assertEquals("dropparttable", event.getTableName());
+    
assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_PARTITION\",\"server\":\"\","
 +
+        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
+        
"\"dropparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}"));
+  }
+
+  @Test
+  public void getOnlyMaxEvents() throws Exception {
+    Database db = new Database("db1", "no description", "file:/tmp", 
emptyParameters);
+    msClient.createDatabase(db);
+    db = new Database("db2", "no description", "file:/tmp", emptyParameters);
+    msClient.createDatabase(db);
+    db = new Database("db3", "no description", "file:/tmp", emptyParameters);
+    msClient.createDatabase(db);
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
2, null);
+    assertEquals(2, rsp.getEventsSize());
+    assertEquals(firstEventId + 1, rsp.getEvents().get(0).getEventId());
+    assertEquals(firstEventId + 2, rsp.getEvents().get(1).getEventId());
+  }
+
+  @Test
+  public void filter() throws Exception {
+    Database db = new Database("f1", "no description", "file:/tmp", 
emptyParameters);
+    msClient.createDatabase(db);
+    db = new Database("f2", "no description", "file:/tmp", emptyParameters);
+    msClient.createDatabase(db);
+    msClient.dropDatabase("f2");
+
+    IMetaStoreClient.NotificationFilter filter = new 
IMetaStoreClient.NotificationFilter() {
+      @Override
+      public boolean accept(NotificationEvent event) {
+        return 
event.getEventType().equals(HCatConstants.HCAT_DROP_DATABASE_EVENT);
+      }
+    };
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, filter);
+    assertEquals(1, rsp.getEventsSize());
+    assertEquals(firstEventId + 3, rsp.getEvents().get(0).getEventId());
+  }
+
+  @Test
+  public void filterWithMax() throws Exception {
+    Database db = new Database("f10", "no description", "file:/tmp", 
emptyParameters);
+    msClient.createDatabase(db);
+    db = new Database("f11", "no description", "file:/tmp", emptyParameters);
+    msClient.createDatabase(db);
+    msClient.dropDatabase("f11");
+
+    IMetaStoreClient.NotificationFilter filter = new 
IMetaStoreClient.NotificationFilter() {
+      @Override
+      public boolean accept(NotificationEvent event) {
+        return 
event.getEventType().equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT);
+      }
+    };
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
1, filter);
+    assertEquals(1, rsp.getEventsSize());
+    assertEquals(firstEventId + 1, rsp.getEvents().get(0).getEventId());
+  }
+}
\ No newline at end of file

Modified: hive/trunk/metastore/if/hive_metastore.thrift
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/if/hive_metastore.thrift?rev=1650145&r1=1650144&r2=1650145&view=diff
==============================================================================
--- hive/trunk/metastore/if/hive_metastore.thrift (original)
+++ hive/trunk/metastore/if/hive_metastore.thrift Wed Jan  7 20:15:10 2015
@@ -642,6 +642,29 @@ struct ShowCompactResponse {
     1: required list<ShowCompactResponseElement> compacts,
 }
 
+struct NotificationEventRequest {
+    1: required i64 lastEvent,
+    2: optional i32 maxEvents,
+}
+
+struct NotificationEvent {
+    1: required i64 eventId,
+    2: required i32 eventTime,
+    3: required string eventType,
+    4: optional string dbName,
+    5: optional string tableName,
+    6: required string message,
+}
+
+struct NotificationEventResponse {
+    1: required list<NotificationEvent> events,
+}
+
+struct CurrentNotificationEventId {
+    1: required i64 eventId,
+}
+
+
 exception MetaException {
   1: string message
 }
@@ -1107,6 +1130,10 @@ service ThriftHiveMetastore extends fb30
   HeartbeatTxnRangeResponse heartbeat_txn_range(1:HeartbeatTxnRangeRequest 
txns)
   void compact(1:CompactionRequest rqst) 
   ShowCompactResponse show_compact(1:ShowCompactRequest rqst)
+
+  // Notification logging calls
+  NotificationEventResponse getNextNotification(1:NotificationEventRequest 
rqst) 
+  CurrentNotificationEventId getCurrentNotificationEventId()
 }
 
 // * Note about the DDL_TIME: When creating or altering a table or a partition,


Reply via email to