Repository: hive
Updated Branches:
  refs/heads/master 2985262b8 -> aa29cd9d6


http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
new file mode 100644
index 0000000..20011cc
--- /dev/null
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience.Private;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+
+/**
+ * This class is used to notify a list of listeners about specific MetaStore 
events.
+ */
+@Private
+public class MetaStoreListenerNotifier {
+  private interface EventNotifier {
+    void notify(MetaStoreEventListener listener, ListenerEvent event) throws 
MetaException;
+  }
+
+  private static Map<EventType, EventNotifier> notificationEvents = 
Maps.newHashMap(
+      ImmutableMap.<EventType, EventNotifier>builder()
+          .put(EventType.CREATE_DATABASE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onCreateDatabase((CreateDatabaseEvent)event);
+            }
+          })
+          .put(EventType.DROP_DATABASE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onDropDatabase((DropDatabaseEvent)event);
+            }
+          })
+          .put(EventType.CREATE_TABLE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onCreateTable((CreateTableEvent)event);
+            }
+          })
+          .put(EventType.DROP_TABLE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onDropTable((DropTableEvent)event);
+            }
+          })
+          .put(EventType.ADD_PARTITION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onAddPartition((AddPartitionEvent)event);
+            }
+          })
+          .put(EventType.DROP_PARTITION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onDropPartition((DropPartitionEvent)event);
+            }
+          })
+          .put(EventType.ALTER_TABLE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onAlterTable((AlterTableEvent)event);
+            }
+          })
+          .put(EventType.ALTER_PARTITION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onAlterPartition((AlterPartitionEvent)event);
+            }
+          })
+          .put(EventType.INSERT, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onInsert((InsertEvent)event);
+            }
+          })
+          .put(EventType.CREATE_FUNCTION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onCreateFunction((CreateFunctionEvent)event);
+            }
+          })
+          .put(EventType.DROP_FUNCTION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onDropFunction((DropFunctionEvent)event);
+            }
+          })
+          .put(EventType.CREATE_INDEX, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onAddIndex((AddIndexEvent)event);
+            }
+          })
+          .put(EventType.DROP_INDEX, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onDropIndex((DropIndexEvent)event);
+            }
+          })
+          .put(EventType.ALTER_INDEX, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent 
event) throws MetaException {
+              listener.onAlterIndex((AlterIndexEvent)event);
+            }
+          })
+          .build()
+  );
+
+  /**
+   * Notify a list of listeners about a specific metastore event. Each 
listener notified might update
+   * the (ListenerEvent) event by setting a parameter key/value pair. These 
updated parameters will
+   * be returned to the caller.
+   *
+   * @param listeners List of MetaStoreEventListener listeners.
+   * @param eventType Type of the notification event.
+   * @param event The ListenerEvent with information about the event.
+   * @return A list of key/value pair parameters that the listeners set. The 
returned object will return an empty
+   *         map if no parameters were updated or if no listeners were 
notified.
+   * @throws MetaException If an error occurred while calling the listeners.
+   */
+  public static Map<String, String> notifyEvent(List<MetaStoreEventListener> 
listeners,
+                                                EventType eventType,
+                                                ListenerEvent event) throws 
MetaException {
+
+    Preconditions.checkNotNull(listeners, "Listeners must not be null.");
+    Preconditions.checkNotNull(event, "The event must not be null.");
+
+    for (MetaStoreEventListener listener : listeners) {
+      notificationEvents.get(eventType).notify(listener, event);
+    }
+
+    // Each listener called above might set a different parameter on the event.
+    // This write permission is allowed on the listener side to avoid breaking 
compatibility if we change the API
+    // method calls.
+    return event.getParameters();
+  }
+
+  /**
+   * Notify a list of listeners about a specific metastore event. Each 
listener notified might update
+   * the (ListenerEvent) event by setting a parameter key/value pair. These 
updated parameters will
+   * be returned to the caller.
+   *
+   * @param listeners List of MetaStoreEventListener listeners.
+   * @param eventType Type of the notification event.
+   * @param event The ListenerEvent with information about the event.
+   * @param environmentContext An EnvironmentContext object with parameters 
sent by the HMS client.
+   * @return A list of key/value pair parameters that the listeners set. The 
returned object will return an empty
+   *         map if no parameters were updated or if no listeners were 
notified.
+   * @throws MetaException If an error occurred while calling the listeners.
+   */
+  public static Map<String, String> notifyEvent(List<MetaStoreEventListener> 
listeners,
+                                                EventType eventType,
+                                                ListenerEvent event,
+                                                EnvironmentContext 
environmentContext) throws MetaException {
+
+    Preconditions.checkNotNull(event, "The event must not be null.");
+
+    event.setEnvironmentContext(environmentContext);
+    return notifyEvent(listeners, eventType, event);
+  }
+
+  /**
+   * Notify a list of listeners about a specific metastore event. Each 
listener notified might update
+   * the (ListenerEvent) event by setting a parameter key/value pair. These 
updated parameters will
+   * be returned to the caller.
+   *
+   * @param listeners List of MetaStoreEventListener listeners.
+   * @param eventType Type of the notification event.
+   * @param event The ListenerEvent with information about the event.
+   * @param environmentContext An EnvironmentContext object with parameters 
sent by the HMS client.
+   * @param parameters A list of key/value pairs with the new parameters to 
add.
+   * @return A list of key/value pair parameters that the listeners set. The 
returned object will return an empty
+   *         map if no parameters were updated or if no listeners were 
notified.
+   * @throws MetaException If an error occurred while calling the listeners.
+   */
+  public static Map<String, String> notifyEvent(List<MetaStoreEventListener> 
listeners,
+                                                EventType eventType,
+                                                ListenerEvent event,
+                                                EnvironmentContext 
environmentContext,
+                                                Map<String, String> 
parameters) throws MetaException {
+
+    Preconditions.checkNotNull(event, "The event must not be null.");
+
+    event.putParameters(parameters);
+    return notifyEvent(listeners, eventType, event, environmentContext);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
index 62aeb8c..b741549 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
@@ -21,10 +21,18 @@ package org.apache.hadoop.hive.metastore.events;
 import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Base class for all the events which are defined for metastore.
+ *
+ * This class is not thread-safe and not expected to be called in parallel.
  */
 
+@NotThreadSafe
 public abstract class ListenerEvent {
 
   /**
@@ -33,6 +41,26 @@ public abstract class ListenerEvent {
   private final boolean status;
   private final HMSHandler handler;
 
+  /**
+   * Key/value parameters used by listeners to store notifications results
+   * i.e. DbNotificationListener sets a DB_NOTIFICATION_EVENT_ID.
+   *
+   * NotThreadSafe: The parameters map is not expected to be access in 
parallel by Hive, so keep it thread-unsafe
+   * to avoid locking overhead.
+   */
+  private Map<String, String> parameters;
+
+  /** For performance concerns, it is preferable to cache the unmodifiable 
parameters variable that will be returned on the
+   * {@link #getParameters()} method. It is expected that {@link 
#putParameter(String, String)} is called less times
+   * than {@link #getParameters()}, so performance may be better by using this 
cache.
+   */
+  private Map<String, String> unmodifiableParameters;
+
+  // Listener parameters aren't expected to have many values. So far only
+  // DbNotificationListener will add a parameter; let's set a low initial 
capacity for now.
+  // If we find out many parameters are added, then we can adjust or remove 
this initial capacity.
+  private static final int PARAMETERS_INITIAL_CAPACITY = 1;
+
   // Properties passed by the client, to be used in execution hooks.
   private EnvironmentContext environmentContext = null;
 
@@ -40,6 +68,8 @@ public abstract class ListenerEvent {
     super();
     this.status = status;
     this.handler = handler;
+    this.parameters = new HashMap<>(PARAMETERS_INITIAL_CAPACITY);
+    updateUnmodifiableParameters();
   }
 
   /**
@@ -49,6 +79,12 @@ public abstract class ListenerEvent {
     return status;
   }
 
+  /**
+   * Set the environment context of the event.
+   *
+   * @param environmentContext An EnvironmentContext object that contains 
environment parameters sent from
+   *                           the HMS client.
+   */
   public void setEnvironmentContext(EnvironmentContext environmentContext) {
     this.environmentContext = environmentContext;
   }
@@ -66,4 +102,74 @@ public abstract class ListenerEvent {
   public HMSHandler getHandler() {
     return handler;
   }
+
+  /**
+   * Return all parameters of the listener event. Parameters are read-only 
(unmodifiable map). If a new parameter
+   * must be added, please use the putParameter() method.
+   *
+   *
+   * @return A map object with all parameters.
+   */
+  public final Map<String, String> getParameters() {
+    return unmodifiableParameters;
+  }
+
+  /**
+   * Put a new parameter to the listener event.
+   *
+   * Overridden parameters is not allowed, and an exception may be thrown to 
avoid a mis-configuration
+   * between listeners setting the same parameters.
+   *
+   * @param name Name of the parameter.
+   * @param value Value of the parameter.
+   * @throws IllegalStateException if a parameter already exists.
+   */
+  public void putParameter(String name, String value) {
+    putParameterIfAbsent(name, value);
+    updateUnmodifiableParameters();
+  }
+
+  /**
+   * Put a new set the parameters to the listener event.
+   *
+   * Overridden parameters is not allowed, and an exception may be thrown to 
avoid a mis-configuration
+   * between listeners setting the same parameters.
+   *
+   * @param parameters A Map object with the a set of parameters.
+   * @throws IllegalStateException if a parameter already exists.
+   */
+  public void putParameters(final Map<String, String> parameters) {
+    if (parameters != null) {
+      for (Map.Entry<String, String> entry : parameters.entrySet()) {
+        putParameterIfAbsent(entry.getKey(), entry.getValue());
+      }
+
+      updateUnmodifiableParameters();
+    }
+  }
+
+  /**
+   * Put a parameter to the listener event only if the parameter is absent.
+   *
+   * Overridden parameters is not allowed, and an exception may be thrown to 
avoid a mis-configuration
+   * between listeners setting the same parameters.
+   *
+   * @param name Name of the parameter.
+   * @param value Value of the parameter.
+   * @throws IllegalStateException if a parameter already exists.
+   */
+  private void putParameterIfAbsent(String name, String value) {
+    if (parameters.containsKey(name)) {
+      throw new IllegalStateException("Invalid attempt to overwrite a 
read-only parameter: " + name);
+    }
+
+    parameters.put(name, value);
+  }
+
+  /**
+   * Keeps a cache of unmodifiable parameters returned by the getParameters() 
method.
+   */
+  private void updateUnmodifiableParameters() {
+    unmodifiableParameters = Collections.unmodifiableMap(parameters);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java 
b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
index 1f87eeb..9b8eaf2 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@ -29,6 +29,7 @@ import 
org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics;
 import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
 import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -37,6 +38,9 @@ import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.InvalidInputException;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
@@ -46,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
@@ -116,6 +121,51 @@ public class TestObjectStore {
   }
 
   /**
+   * Test notification operations
+   */
+  @Test
+  public void testNotificationOps() throws InterruptedException {
+    final int NO_EVENT_ID = 0;
+    final int FIRST_EVENT_ID = 1;
+    final int SECOND_EVENT_ID = 2;
+
+    NotificationEvent event =
+        new NotificationEvent(0, 0, 
EventMessage.EventType.CREATE_DATABASE.toString(), "");
+    NotificationEventResponse eventResponse;
+    CurrentNotificationEventId eventId;
+
+    // Verify that there is no notifications available yet
+    eventId = objectStore.getCurrentNotificationEventId();
+    Assert.assertEquals(NO_EVENT_ID, eventId.getEventId());
+
+    // Verify that addNotificationEvent() updates the NotificationEvent with 
the new event ID
+    objectStore.addNotificationEvent(event);
+    Assert.assertEquals(FIRST_EVENT_ID, event.getEventId());
+    objectStore.addNotificationEvent(event);
+    Assert.assertEquals(SECOND_EVENT_ID, event.getEventId());
+
+    // Verify that objectStore fetches the latest notification event ID
+    eventId = objectStore.getCurrentNotificationEventId();
+    Assert.assertEquals(SECOND_EVENT_ID, eventId.getEventId());
+
+    // Verify that getNextNotification() returns all events
+    eventResponse = objectStore.getNextNotification(new 
NotificationEventRequest());
+    Assert.assertEquals(2, eventResponse.getEventsSize());
+    Assert.assertEquals(FIRST_EVENT_ID, 
eventResponse.getEvents().get(0).getEventId());
+    Assert.assertEquals(SECOND_EVENT_ID, 
eventResponse.getEvents().get(1).getEventId());
+    // Verify that getNextNotification(last) returns events after a specified 
event
+    eventResponse = objectStore.getNextNotification(new 
NotificationEventRequest(FIRST_EVENT_ID));
+    Assert.assertEquals(1, eventResponse.getEventsSize());
+    Assert.assertEquals(SECOND_EVENT_ID, 
eventResponse.getEvents().get(0).getEventId());
+
+    // Verify that cleanNotificationEvents() cleans up all old notifications
+    Thread.sleep(1);
+    objectStore.cleanNotificationEvents(1);
+    eventResponse = objectStore.getNextNotification(new 
NotificationEventRequest());
+    Assert.assertEquals(0, eventResponse.getEventsSize());
+  }
+
+  /**
    * Test database operations
    */
   @Test

Reply via email to