Author: khorgath
Date: Wed Jan 7 20:15:10 2015
New Revision: 1650145
URL: http://svn.apache.org/r1650145
Log:
HIVE-9174 : Enable queuing of HCatalog notification events in metastore DB
(Alan Gates via Sushanth Sowmyan)
Added:
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/
hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CurrentNotificationEventId.java
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEvent.java
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventRequest.java
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventResponse.java
hive/trunk/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationLog.java
hive/trunk/metastore/src/model/org/apache/hadoop/hive/metastore/model/MNotificationNextId.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
hive/trunk/itests/hcatalog-unit/pom.xml
hive/trunk/metastore/if/hive_metastore.thrift
hive/trunk/metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore.cpp
hive/trunk/metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore.h
hive/trunk/metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore_server.skeleton.cpp
hive/trunk/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
hive/trunk/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java
hive/trunk/metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php
hive/trunk/metastore/src/gen/thrift/gen-php/metastore/Types.php
hive/trunk/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote
hive/trunk/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py
hive/trunk/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
hive/trunk/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
hive/trunk/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
hive/trunk/metastore/src/model/package.jdo
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL:
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1650145&r1=1650144&r2=1650145&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed
Jan 7 20:15:10 2015
@@ -477,6 +477,9 @@ public class HiveConf extends Configurat
METASTORE_PRE_EVENT_LISTENERS("hive.metastore.pre.event.listeners", "",
"List of comma separated listeners for metastore events."),
METASTORE_EVENT_LISTENERS("hive.metastore.event.listeners", "", ""),
+
METASTORE_EVENT_DB_LISTENER_TTL("hive.metastore.event.db.listener.timetolive",
"86400s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "time after which events will be removed from the database listener
queue"),
METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS("hive.metastore.authorization.storage.checks",
false,
"Should the metastore do authorization checks against the underlying
storage (usually hdfs) \n" +
"for operations like drop-partition (disallow the drop-partition if
the user in\n" +
Added:
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java?rev=1650145&view=auto
==============================================================================
---
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
(added)
+++
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
Wed Jan 7 20:15:10 2015
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.listener;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.RawStoreProxy;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.messaging.MessageFactory;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An implementation of {@link
org.apache.hadoop.hive.metastore.MetaStoreEventListener} that
+ * stores events in the database.
+ *
+ * Design overview: This listener takes any event, builds a
NotificationEventResponse,
+ * and puts it on a queue. There is a dedicated thread that reads entries
from the queue and
+ * places them in the database. The reason for doing it in a separate thread
is that we want to
+ * avoid slowing down other metadata operations with the work of putting the
notification into
+ * the database. Also, occasionally the thread needs to clean the database of
old records. We
+ * definitely don't want to do that as part of another metadata operation.
+ */
+public class DbNotificationListener extends MetaStoreEventListener {
+
+ private static final Log LOG =
LogFactory.getLog(DbNotificationListener.class.getName());
+ private static CleanerThread cleaner = null;
+
+ // This is the same object as super.conf, but it's convenient to keep a copy
of it as a
+ // HiveConf rather than a Configuration.
+ private HiveConf hiveConf;
+ private MessageFactory msgFactory;
+ private RawStore rs;
+
+ private synchronized void init(HiveConf conf) {
+ try {
+ rs = RawStoreProxy.getProxy(conf, conf,
+ conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999);
+ } catch (MetaException e) {
+ LOG.error("Unable to connect to raw store, notifications will not be
tracked", e);
+ rs = null;
+ }
+ if (cleaner == null && rs != null) {
+ cleaner = new CleanerThread(conf, rs);
+ cleaner.start();
+ }
+ }
+
+ public DbNotificationListener(Configuration config) {
+ super(config);
+ // The code in MetastoreUtils.getMetaStoreListeners() that calls this
looks for a constructor
+ // with a Configuration parameter, so we have to declare config as
Configuration. But it
+ // actually passes a HiveConf, which we need. So we'll do this ugly down
cast.
+ hiveConf = (HiveConf)config;
+ init(hiveConf);
+ msgFactory = MessageFactory.getInstance();
+ }
+
+ /**
+ * @param tableEvent table event.
+ * @throws org.apache.hadoop.hive.metastore.api.MetaException
+ */
+ public void onConfigChange(ConfigChangeEvent tableEvent) throws
MetaException {
+ String key = tableEvent.getKey();
+ if
(key.equals(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.toString())) {
+ // This weirdness of setting it in our hiveConf and then reading back
does two things.
+ // One, it handles the conversion of the TimeUnit. Two, it keeps the
value around for
+ // later in case we need it again.
+ hiveConf.set(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.name(),
+ tableEvent.getNewValue());
+
cleaner.setTimeToLive(hiveConf.getTimeVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL,
+ TimeUnit.SECONDS));
+ }
+ }
+
+ /**
+ * @param tableEvent table event.
+ * @throws MetaException
+ */
+ public void onCreateTable (CreateTableEvent tableEvent) throws MetaException
{
+ Table t = tableEvent.getTable();
+ NotificationEvent event = new NotificationEvent(0, now(),
+ HCatConstants.HCAT_CREATE_TABLE_EVENT,
msgFactory.buildCreateTableMessage(t).toString());
+ event.setDbName(t.getDbName());
+ // Table name is not set in create table because this goes on the queue
for the database the
+ // table is created in, not the (new) queue for the table itself.
+ enqueue(event);
+ }
+
+ /**
+ * @param tableEvent table event.
+ * @throws MetaException
+ */
+ public void onDropTable (DropTableEvent tableEvent) throws MetaException {
+ Table t = tableEvent.getTable();
+ NotificationEvent event = new NotificationEvent(0, now(),
+ HCatConstants.HCAT_DROP_TABLE_EVENT,
msgFactory.buildDropTableMessage(t).toString());
+ event.setDbName(t.getDbName());
+ event.setTableName(t.getTableName());
+ enqueue(event);
+ }
+
+ /**
+ * @param tableEvent alter table event
+ * @throws MetaException
+ */
+ public void onAlterTable (AlterTableEvent tableEvent) throws MetaException {
+ /*Table before = tableEvent.getOldTable();
+ Table after = tableEvent.getNewTable();
+ NotificationEvent event = new NotificationEvent(0, now(),
+ HCatConstants.HCAT_ALTER_TABLE_EVENT,
+ msgFactory.buildAlterTableMessage(before, after).toString());
+ if (event != null) {
+ event.setDbName(after.getDbName());
+ event.setTableName(after.getTableName());
+ enqueue(event);
+ }*/
+ // TODO - once HIVE-9175 is committed
+ }
+
+ /**
+ * @param partitionEvent partition event
+ * @throws MetaException
+ */
+ public void onAddPartition (AddPartitionEvent partitionEvent)
+ throws MetaException {
+ Table t = partitionEvent.getTable();
+ NotificationEvent event = new NotificationEvent(0, now(),
+ HCatConstants.HCAT_ADD_PARTITION_EVENT,
+ msgFactory.buildAddPartitionMessage(t,
partitionEvent.getPartitions()).toString());
+ event.setDbName(t.getDbName());
+ event.setTableName(t.getTableName());
+ enqueue(event);
+ }
+
+ /**
+ * @param partitionEvent partition event
+ * @throws MetaException
+ */
+ public void onDropPartition (DropPartitionEvent partitionEvent) throws
MetaException {
+ Table t = partitionEvent.getTable();
+ NotificationEvent event = new NotificationEvent(0, now(),
+ HCatConstants.HCAT_DROP_PARTITION_EVENT,
+ msgFactory.buildDropPartitionMessage(t,
partitionEvent.getPartition()).toString());
+ event.setDbName(t.getDbName());
+ event.setTableName(t.getTableName());
+ enqueue(event);
+ }
+
+ /**
+ * @param partitionEvent partition event
+ * @throws MetaException
+ */
+ public void onAlterPartition (AlterPartitionEvent partitionEvent) throws
MetaException {
+ // TODO, MessageFactory doesn't support Alter Partition yet.
+ }
+
+ /**
+ * @param dbEvent database event
+ * @throws MetaException
+ */
+ public void onCreateDatabase (CreateDatabaseEvent dbEvent) throws
MetaException {
+ Database db = dbEvent.getDatabase();
+ NotificationEvent event = new NotificationEvent(0, now(),
+ HCatConstants.HCAT_CREATE_DATABASE_EVENT,
+ msgFactory.buildCreateDatabaseMessage(db).toString());
+ // Database name is null for create database, because this doesn't belong
to messages for
+ // that database. Rather it belongs to system wide messages. The db name
is in the message,
+ // so listeners can determine it.
+ enqueue(event);
+ }
+
+ /**
+ * @param dbEvent database event
+ * @throws MetaException
+ */
+ public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException {
+ Database db = dbEvent.getDatabase();
+ NotificationEvent event = new NotificationEvent(0, now(),
+ HCatConstants.HCAT_DROP_DATABASE_EVENT,
+ msgFactory.buildDropDatabaseMessage(db).toString());
+ event.setDbName(db.getName());
+ enqueue(event);
+ }
+
+ /**
+ * @param partSetDoneEvent
+ * @throws MetaException
+ */
+ public void onLoadPartitionDone(LoadPartitionDoneEvent partSetDoneEvent)
throws MetaException {
+ // TODO, we don't support this, but we should, since users may create an
empty partition and
+ // then load data into it.
+
+ }
+
+ private int now() {
+ long millis = System.currentTimeMillis();
+ millis /= 1000;
+ if (millis > Integer.MAX_VALUE) {
+ LOG.warn("We've passed max int value in seconds since the epoch, " +
+ "all notification times will be the same!");
+ return Integer.MAX_VALUE;
+ }
+ return (int)millis;
+ }
+
+ private void enqueue(NotificationEvent event) {
+ if (rs != null) {
+ rs.addNotificationEvent(event);
+ } else {
+ LOG.warn("Dropping event " + event + " since notification is not
running.");
+ }
+ }
+
+ private static class CleanerThread extends Thread {
+ private RawStore rs;
+ private int ttl;
+
+
+ CleanerThread(HiveConf conf, RawStore rs) {
+ super("CleanerThread");
+ this.rs = rs;
+
setTimeToLive(conf.getTimeVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL,
+ TimeUnit.SECONDS));
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ rs.cleanNotificationEvents(ttl);
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e) {
+ LOG.info("Cleaner thread sleep interupted", e);
+ }
+ }
+ }
+
+ public void setTimeToLive(long configTtl) {
+ if (configTtl > Integer.MAX_VALUE) ttl = Integer.MAX_VALUE;
+ else ttl = (int)configTtl;
+ }
+
+ }
+
+}
Modified:
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java?rev=1650145&r1=1650144&r2=1650145&view=diff
==============================================================================
---
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
(original)
+++
hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
Wed Jan 7 20:15:10 2015
@@ -19,13 +19,18 @@
package org.apache.hive.hcatalog.messaging.json;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hive.hcatalog.messaging.AddPartitionMessage;
+// TODO, once HIVE-9175 is committed
+// import org.apache.hive.hcatalog.messaging.AlterTableMessage;
import org.apache.hive.hcatalog.messaging.CreateDatabaseMessage;
import org.apache.hive.hcatalog.messaging.CreateTableMessage;
import org.apache.hive.hcatalog.messaging.DropDatabaseMessage;
@@ -42,6 +47,9 @@ import java.util.*;
*/
public class JSONMessageFactory extends MessageFactory {
+ private static final Log LOG =
LogFactory.getLog(JSONMessageFactory.class.getName());
+
+
private static JSONMessageDeserializer deserializer = new
JSONMessageDeserializer();
@Override
@@ -62,31 +70,40 @@ public class JSONMessageFactory extends
@Override
public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) {
return new JSONCreateDatabaseMessage(HCAT_SERVER_URL,
HCAT_SERVICE_PRINCIPAL, db.getName(),
- System.currentTimeMillis() / 1000);
+ now());
}
@Override
public DropDatabaseMessage buildDropDatabaseMessage(Database db) {
return new JSONDropDatabaseMessage(HCAT_SERVER_URL,
HCAT_SERVICE_PRINCIPAL, db.getName(),
- System.currentTimeMillis() / 1000);
+ now());
}
@Override
public CreateTableMessage buildCreateTableMessage(Table table) {
return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL,
table.getDbName(),
- table.getTableName(), System.currentTimeMillis()/1000);
+ table.getTableName(), now());
}
+ // TODO Once HIVE-9175 is committed
+ /*
+ @Override
+ public AlterTableMessage buildAlterTableMessage(Table before, Table after) {
+ return new JSONAlterTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL,
before.getDbName(),
+ before.getTableName(), now());
+ }
+ */
+
@Override
public DropTableMessage buildDropTableMessage(Table table) {
return new JSONDropTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL,
table.getDbName(), table.getTableName(),
- System.currentTimeMillis()/1000);
+ now());
}
@Override
public AddPartitionMessage buildAddPartitionMessage(Table table,
List<Partition> partitions) {
return new JSONAddPartitionMessage(HCAT_SERVER_URL,
HCAT_SERVICE_PRINCIPAL, table.getDbName(),
- table.getTableName(), getPartitionKeyValues(table, partitions),
System.currentTimeMillis()/1000);
+ table.getTableName(), getPartitionKeyValues(table, partitions), now());
}
@Override
@@ -100,8 +117,11 @@ public class JSONMessageFactory extends
@Override
public DropPartitionMessage buildDropPartitionMessage(Table table, Partition
partition) {
return new JSONDropPartitionMessage(HCAT_SERVER_URL,
HCAT_SERVICE_PRINCIPAL, partition.getDbName(),
- partition.getTableName(), Arrays.asList(getPartitionKeyValues(table,
partition)),
- System.currentTimeMillis()/1000);
+ partition.getTableName(), Arrays.asList(getPartitionKeyValues(table,
partition)), now());
+ }
+
+ private long now() {
+ return System.currentTimeMillis() / 1000;
}
private static Map<String, String> getPartitionKeyValues(Table table,
Partition partition) {
Modified: hive/trunk/itests/hcatalog-unit/pom.xml
URL:
http://svn.apache.org/viewvc/hive/trunk/itests/hcatalog-unit/pom.xml?rev=1650145&r1=1650144&r2=1650145&view=diff
==============================================================================
--- hive/trunk/itests/hcatalog-unit/pom.xml (original)
+++ hive/trunk/itests/hcatalog-unit/pom.xml Wed Jan 7 20:15:10 2015
@@ -60,6 +60,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-server-extensions</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-hbase-handler</artifactId>
<version>${project.version}</version>
Added:
hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
URL:
http://svn.apache.org/viewvc/hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java?rev=1650145&view=auto
==============================================================================
---
hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
(added)
+++
hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
Wed Jan 7 20:15:10 2015
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.hive.hcatalog.listener;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.listener.DbNotificationListener;
+import org.apache.hive.hcatalog.messaging.HCatEventMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestDbNotificationListener {
+
+ private static final Log LOG =
LogFactory.getLog(TestDbNotificationListener.class.getName());
+ private static Map<String, String> emptyParameters = new HashMap<String,
String>();
+ private static IMetaStoreClient msClient;
+ private int startTime;
+ private long firstEventId;
+
+ @BeforeClass
+ public static void connectToMetastore() throws Exception {
+ HiveConf conf = new HiveConf();
+ conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS,
+ DbNotificationListener.class.getName());
+ msClient = new HiveMetaStoreClient(conf);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ long now = System.currentTimeMillis() / 1000;
+ startTime = 0;
+ if (now > Integer.MAX_VALUE) fail("Bummer, time has fallen over the edge");
+ else startTime = (int) now;
+ firstEventId = msClient.getCurrentNotificationEventId().getEventId();
+ }
+
+ @Test
+ public void createDatabase() throws Exception {
+ Database db = new Database("mydb", "no description", "file:/tmp",
emptyParameters);
+ msClient.createDatabase(db);
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId,
0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(0);
+ assertEquals(firstEventId + 1, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT,
event.getEventType());
+ assertNull(event.getDbName());
+ assertNull(event.getTableName());
+
assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_DATABASE\",\"server\":\"\","
+
+ "\"servicePrincipal\":\"\",\"db\":\"mydb\",\"timestamp\":[0-9]+}"));
+ }
+
+ @Test
+ public void dropDatabase() throws Exception {
+ Database db = new Database("dropdb", "no description", "file:/tmp",
emptyParameters);
+ msClient.createDatabase(db);
+ msClient.dropDatabase("dropdb");
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId,
0, null);
+ assertEquals(2, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(1);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, event.getEventType());
+ assertEquals("dropdb", event.getDbName());
+ assertNull(event.getTableName());
+
assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_DATABASE\",\"server\":\"\","
+
+ "\"servicePrincipal\":\"\",\"db\":\"dropdb\",\"timestamp\":[0-9]+}"));
+ }
+
+ @Test
+ public void createTable() throws Exception {
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(new FieldSchema("col1", "int", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input",
"output", false, 0,
+ serde, null, null, emptyParameters);
+ Table table = new Table("mytable", "default", "me", startTime, startTime,
0, sd, null,
+ emptyParameters, null, null, null);
+ msClient.createTable(table);
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId,
0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(0);
+ assertEquals(firstEventId + 1, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
+ assertEquals("default", event.getDbName());
+ assertNull(event.getTableName());
+
assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\","
+
+
"\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"mytable\",\"timestamp\":[0-9]+}"));
+ }
+
+ // TODO After HIVE-9175 is committed
+ /*
+ @Test
+ public void alterTable() throws Exception {
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(new FieldSchema("col1", "int", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input",
"output", false, 0,
+ serde, null, null, emptyParameters);
+ Table table = new Table("alttable", "default", "me", startTime, startTime,
0, sd, null,
+ emptyParameters, null, null, null);
+ msClient.createTable(table);
+
+ table = new Table("alttable", "default", "me", startTime, startTime + 1,
0, sd, null,
+ emptyParameters, null, null, null);
+ msClient.alter_table("default", "alttable", table);
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId,
0, null);
+ assertEquals(2, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(1);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, event.getEventType());
+ assertEquals("default", event.getDbName());
+ assertEquals("alttable", event.getTableName());
+
assertTrue(event.getMessage().matches("\\{\"eventType\":\"ALTER_TABLE\",\"server\":\"\","
+
+ "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alttable\","
+
+ "\"timestamp\":[0-9]+}"));
+ }
+ */
+
+ @Test
+ public void dropTable() throws Exception {
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(new FieldSchema("col1", "int", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input",
"output", false, 0,
+ serde, null, null, emptyParameters);
+ Table table = new Table("droptable", "default", "me", startTime,
startTime, 0, sd, null,
+ emptyParameters, null, null, null);
+ msClient.createTable(table);
+ msClient.dropTable("default", "droptable");
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId,
0, null);
+ assertEquals(2, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(1);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, event.getEventType());
+ assertEquals("default", event.getDbName());
+ assertEquals("droptable", event.getTableName());
+
assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_TABLE\",\"server\":\"\","
+
+ "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
+ "\"droptable\",\"timestamp\":[0-9]+}"));
+ }
+
+ @Test
+ public void addPartition() throws Exception {
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(new FieldSchema("col1", "int", "nocomment"));
+ List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input",
"output", false, 0,
+ serde, null, null, emptyParameters);
+ Table table = new Table("addPartTable", "default", "me", startTime,
startTime, 0, sd, partCols,
+ emptyParameters, null, null, null);
+ msClient.createTable(table);
+
+ Partition partition = new Partition(Arrays.asList("today"), "default",
"addPartTable",
+ startTime, startTime, sd, emptyParameters);
+ msClient.add_partition(partition);
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId,
0, null);
+ assertEquals(2, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(1);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
+ assertEquals("default", event.getDbName());
+ assertEquals("addparttable", event.getTableName());
+
assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\","
+
+ "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
+
"\"addparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}"));
+ }
+
+ @Test
+ public void dropPartition() throws Exception {
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(new FieldSchema("col1", "int", "nocomment"));
+ List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input",
"output", false, 0,
+ serde, null, null, emptyParameters);
+ Table table = new Table("dropPartTable", "default", "me", startTime,
startTime, 0, sd, partCols,
+ emptyParameters, null, null, null);
+ msClient.createTable(table);
+
+ Partition partition = new Partition(Arrays.asList("today"), "default",
"dropPartTable",
+ startTime, startTime, sd, emptyParameters);
+ msClient.add_partition(partition);
+
+ msClient.dropPartition("default", "dropparttable", Arrays.asList("today"),
false);
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId,
0, null);
+ assertEquals(3, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(2);
+ assertEquals(firstEventId + 3, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT,
event.getEventType());
+ assertEquals("default", event.getDbName());
+ assertEquals("dropparttable", event.getTableName());
+
assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_PARTITION\",\"server\":\"\","
+
+ "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
+
"\"dropparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}"));
+ }
+
+ @Test
+ public void getOnlyMaxEvents() throws Exception {
+ Database db = new Database("db1", "no description", "file:/tmp",
emptyParameters);
+ msClient.createDatabase(db);
+ db = new Database("db2", "no description", "file:/tmp", emptyParameters);
+ msClient.createDatabase(db);
+ db = new Database("db3", "no description", "file:/tmp", emptyParameters);
+ msClient.createDatabase(db);
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId,
2, null);
+ assertEquals(2, rsp.getEventsSize());
+ assertEquals(firstEventId + 1, rsp.getEvents().get(0).getEventId());
+ assertEquals(firstEventId + 2, rsp.getEvents().get(1).getEventId());
+ }
+
+ @Test
+ public void filter() throws Exception {
+ Database db = new Database("f1", "no description", "file:/tmp",
emptyParameters);
+ msClient.createDatabase(db);
+ db = new Database("f2", "no description", "file:/tmp", emptyParameters);
+ msClient.createDatabase(db);
+ msClient.dropDatabase("f2");
+
+ IMetaStoreClient.NotificationFilter filter = new
IMetaStoreClient.NotificationFilter() {
+ @Override
+ public boolean accept(NotificationEvent event) {
+ return
event.getEventType().equals(HCatConstants.HCAT_DROP_DATABASE_EVENT);
+ }
+ };
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId,
0, filter);
+ assertEquals(1, rsp.getEventsSize());
+ assertEquals(firstEventId + 3, rsp.getEvents().get(0).getEventId());
+ }
+
+ @Test
+ public void filterWithMax() throws Exception {
+ Database db = new Database("f10", "no description", "file:/tmp",
emptyParameters);
+ msClient.createDatabase(db);
+ db = new Database("f11", "no description", "file:/tmp", emptyParameters);
+ msClient.createDatabase(db);
+ msClient.dropDatabase("f11");
+
+ IMetaStoreClient.NotificationFilter filter = new
IMetaStoreClient.NotificationFilter() {
+ @Override
+ public boolean accept(NotificationEvent event) {
+ return
event.getEventType().equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT);
+ }
+ };
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId,
1, filter);
+ assertEquals(1, rsp.getEventsSize());
+ assertEquals(firstEventId + 1, rsp.getEvents().get(0).getEventId());
+ }
+}
\ No newline at end of file
Modified: hive/trunk/metastore/if/hive_metastore.thrift
URL:
http://svn.apache.org/viewvc/hive/trunk/metastore/if/hive_metastore.thrift?rev=1650145&r1=1650144&r2=1650145&view=diff
==============================================================================
--- hive/trunk/metastore/if/hive_metastore.thrift (original)
+++ hive/trunk/metastore/if/hive_metastore.thrift Wed Jan 7 20:15:10 2015
@@ -642,6 +642,29 @@ struct ShowCompactResponse {
1: required list<ShowCompactResponseElement> compacts,
}
+struct NotificationEventRequest {
+ 1: required i64 lastEvent,
+ 2: optional i32 maxEvents,
+}
+
+struct NotificationEvent {
+ 1: required i64 eventId,
+ 2: required i32 eventTime,
+ 3: required string eventType,
+ 4: optional string dbName,
+ 5: optional string tableName,
+ 6: required string message,
+}
+
+struct NotificationEventResponse {
+ 1: required list<NotificationEvent> events,
+}
+
+struct CurrentNotificationEventId {
+ 1: required i64 eventId,
+}
+
+
exception MetaException {
1: string message
}
@@ -1107,6 +1130,10 @@ service ThriftHiveMetastore extends fb30
HeartbeatTxnRangeResponse heartbeat_txn_range(1:HeartbeatTxnRangeRequest
txns)
void compact(1:CompactionRequest rqst)
ShowCompactResponse show_compact(1:ShowCompactRequest rqst)
+
+ // Notification logging calls
+ NotificationEventResponse getNextNotification(1:NotificationEventRequest
rqst)
+ CurrentNotificationEventId getCurrentNotificationEventId()
}
// * Note about the DDL_TIME: When creating or altering a table or a partition,