Author: cws
Date: Thu Jun 30 07:33:42 2011
New Revision: 1141430
URL: http://svn.apache.org/viewvc?rev=1141430&view=rev
Log:
HIVE-2225. Purge expired metastore events (Ashutosh Chauhan via cws)
Added:
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java
(with props)
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL:
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1141430&r1=1141429&r2=1141430&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu
Jun 30 07:33:42 2011
@@ -218,7 +218,8 @@ public class HiveConf extends Configurat
METASTORE_EVENT_LISTENERS("hive.metastore.event.listeners", ""),
// should we do checks against the storage (usually hdfs) for operations
like drop_partition
METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS("hive.metastore.authorization.storage.checks",
false),
-
+ METASTORE_EVENT_CLEAN_FREQ("hive.metastore.event.clean.freq",0L),
+ METASTORE_EVENT_EXPIRY_DURATION("hive.metastore.event.expiry.duration",0L),
// Default parameters for creating tables
NEWTABLEDEFAULTPARA("hive.table.parameters.default",""),
Modified: hive/trunk/conf/hive-default.xml
URL:
http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml?rev=1141430&r1=1141429&r2=1141430&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml (original)
+++ hive/trunk/conf/hive-default.xml Thu Jun 30 07:33:42 2011
@@ -233,6 +233,18 @@
</property>
<property>
+ <name>hive.metastore.event.expiry.duration</name>
+ <value>0L</value>
+ <description>Duration after which events expire from events table (in
seconds)</description>
+</property>
+
+<property>
+ <name>hive.metastore.event.clean.freq</name>
+ <value>0L</value>
+ <description>Frequency at which timer task runs to purge expired events in
metastore(in seconds).</description>
+</property>
+
+<property>
<name>hive.metastore.connect.retries</name>
<value>5</value>
<description>Number of retries while opening a connection to
metastore</description>
Modified:
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL:
http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1141430&r1=1141429&r2=1141430&view=diff
==============================================================================
---
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
(original)
+++
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
Thu Jun 30 07:33:42 2011
@@ -31,6 +31,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Timer;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.common.cla
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.common.metrics.Metrics;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
import org.apache.hadoop.hive.metastore.api.Constants;
@@ -76,6 +78,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.EventCleanerTask;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook;
import org.apache.hadoop.hive.metastore.model.MDBPrivilege;
@@ -255,6 +258,12 @@ public class HiveMetaStore extends Thrif
}
}
listeners = MetaStoreUtils.getMetaStoreListener(hiveConf);
+ long cleanFreq =
hiveConf.getLongVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ) * 1000L;
+ if(cleanFreq > 0){
+ // In default config, there is no timer.
+ Timer cleaner = new Timer("Metastore Events Cleaner Thread", true);
+ cleaner.schedule(new EventCleanerTask(this), cleanFreq, cleanFreq);
+ }
return true;
}
Modified:
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
URL:
http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=1141430&r1=1141429&r2=1141430&view=diff
==============================================================================
---
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
(original)
+++
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
Thu Jun 30 07:33:42 2011
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.common.Fil
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
@@ -3586,4 +3587,27 @@ public class ObjectStore implements RawS
}
return join(storedVals,',');
}
+
+ @Override
+ public long cleanupEvents() {
+ boolean commited = false;
+ long delCnt;
+ LOG.debug("Begin executing cleanupEvents");
+ Long expiryTime = HiveConf.getLongVar(getConf(),
ConfVars.METASTORE_EVENT_EXPIRY_DURATION) * 1000L;
+ Long curTime = System.currentTimeMillis();
+ try {
+ openTransaction();
+ Query query = pm.newQuery(MPartitionEvent.class,"curTime - eventTime >
expiryTime");
+ query.declareParameters("java.lang.Long curTime, java.lang.Long
expiryTime");
+ delCnt = query.deletePersistentAll(curTime, expiryTime);
+ commited = commitTransaction();
+ }
+ finally {
+ if (!commited) {
+ rollbackTransaction();
+ }
+ LOG.debug("Done executing cleanupEvents");
+ }
+ return delCnt;
+ }
}
Modified:
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
URL:
http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java?rev=1141430&r1=1141429&r2=1141430&view=diff
==============================================================================
---
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
(original)
+++
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
Thu Jun 30 07:33:42 2011
@@ -286,4 +286,6 @@ public interface RawStore extends Config
public abstract List<Partition> listPartitionsPsWithAuth(String db_name,
String tbl_name,
List<String> part_vals, short max_parts, String userName, List<String>
groupNames)
throws MetaException, InvalidObjectException;
-}
\ No newline at end of file
+
+ public abstract long cleanupEvents();
+}
Added:
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java
URL:
http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java?rev=1141430&view=auto
==============================================================================
---
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java
(added)
+++
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java
Thu Jun 30 07:33:42 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.events;
+
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command;
+import org.apache.hadoop.hive.metastore.RawStore;
+
+public class EventCleanerTask extends TimerTask{
+
+ public static final Log LOG = LogFactory.getLog(EventCleanerTask.class);
+ private final HMSHandler handler;
+
+ public EventCleanerTask(HMSHandler handler) {
+ super();
+ this.handler = handler;
+ }
+
+ @Override
+ public void run() {
+
+ try {
+ long deleteCnt = handler.executeWithRetry(new Command<Long>(){
+ @Override
+ public Long run(RawStore ms) throws Exception {
+ return ms.cleanupEvents();
+ }
+ });
+ if (deleteCnt > 0L){
+ LOG.info("Number of events deleted from event Table: "+deleteCnt);
+ }
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+}
Propchange:
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java
URL:
http://svn.apache.org/viewvc/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java?rev=1141430&r1=1141429&r2=1141430&view=diff
==============================================================================
---
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java
(original)
+++
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java
Thu Jun 30 07:33:42 2011
@@ -25,6 +25,7 @@ import junit.framework.TestCase;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -46,6 +47,8 @@ public class TestMarkPartition extends T
protected void setUp() throws Exception {
super.setUp();
+ System.setProperty(ConfVars.METASTORE_EVENT_CLEAN_FREQ.varname, "2");
+ System.setProperty(ConfVars.METASTORE_EVENT_EXPIRY_DURATION.varname, "5");
hiveConf = new HiveConf(this.getClass());
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
@@ -56,7 +59,7 @@ public class TestMarkPartition extends T
public void testMarkingPartitionSet() throws CommandNeedRetryException,
MetaException,
TException, NoSuchObjectException, UnknownDBException, UnknownTableException,
- InvalidPartitionException, UnknownPartitionException {
+ InvalidPartitionException, UnknownPartitionException, InterruptedException {
HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf, null);
driver = new Driver(hiveConf);
driver.run("drop database if exists tmpdb cascade");
@@ -69,6 +72,8 @@ public class TestMarkPartition extends T
kvs.put("b", "'2011'");
msc.markPartitionForEvent("tmpdb", "tmptbl", kvs,
PartitionEventType.LOAD_DONE);
assert msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs,
PartitionEventType.LOAD_DONE);
+ Thread.sleep(10000);
+ assert !msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs,
PartitionEventType.LOAD_DONE);
kvs.put("b", "'2012'");
assert !msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs,
PartitionEventType.LOAD_DONE);