Modified: hadoop/core/trunk/src/contrib/hive/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=696427&r1=696426&r2=696427&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/hive/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original) +++ hadoop/core/trunk/src/contrib/hive/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Wed Sep 17 13:13:00 2008 @@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -57,10 +56,12 @@ import com.facebook.fb303.FacebookService; import com.facebook.fb303.fb_status; import com.facebook.thrift.TException; +import com.facebook.thrift.protocol.TBinaryProtocol; import com.facebook.thrift.server.TServer; import com.facebook.thrift.server.TThreadPoolServer; import com.facebook.thrift.transport.TServerSocket; import com.facebook.thrift.transport.TServerTransport; +import com.facebook.thrift.transport.TTransportFactory; /** * TODO:pc remove application logic to a separate interface. rename to MetaStoreServer @@ -68,11 +69,28 @@ public class HiveMetaStore extends ThriftHiveMetastore { public static class HMSHandler extends FacebookBase implements ThriftHiveMetastore.Iface{ - public static final Log LOG = LogFactory.getLog("hive.metastore"); + public static final Log LOG = LogFactory.getLog(HiveMetaStore.class.getName()); + private static boolean createDefaultDB = false; + private String rawStoreClassName; private HiveConf hiveConf; // stores datastore (jpox) properties, right now they come from jpox.properties - private RawStore ms; // a metadata store private Warehouse wh; // hdfs warehouse - + private ThreadLocal<RawStore> threadLocalMS = new ThreadLocal() { + protected synchronized Object initialValue() { + return null; + } + }; + + // The next serial number to be assigned + private static int nextSerialNum = 0; + private static ThreadLocal<Integer> threadLocalId = new ThreadLocal() { + protected synchronized Object initialValue() { + return new Integer(nextSerialNum++); + } + }; + public static Integer get() { + return threadLocalId.get(); + } + public HMSHandler(String name) throws MetaException { super(name); hiveConf = new HiveConf(this.getClass()); @@ -94,18 +112,42 @@ } private boolean init() throws MetaException { - String rawStoreClassName = hiveConf.get("hive.metastore.rawstore.impl"); + rawStoreClassName = hiveConf.get("hive.metastore.rawstore.impl"); wh = new Warehouse(hiveConf); - LOG.info("Opening raw store ... impl class:" + rawStoreClassName); - ms = (RawStore) ReflectionUtils.newInstance(getClass(rawStoreClassName, RawStore.class), hiveConf); - // create default database if it doesn't exist + createDefaultDB(); + return true; + } + + /** + * @return + * @throws MetaException + */ + private RawStore getMS() throws MetaException { + RawStore ms = threadLocalMS.get(); + if(ms == null) { + LOG.info(threadLocalId.get() + ": Opening raw store with implemenation class:" + rawStoreClassName); + ms = (RawStore) ReflectionUtils.newInstance(getClass(rawStoreClassName, RawStore.class), hiveConf); + threadLocalMS.set(ms); + ms = threadLocalMS.get(); + } + return ms; + } + + /** + * create default database if it doesn't exist + * @throws MetaException + */ + private void createDefaultDB() throws MetaException { + if(HMSHandler.createDefaultDB) { + return; + } try { - ms.getDatabase(MetaStoreUtils.DEFAULT_DATABASE_NAME); + getMS().getDatabase(MetaStoreUtils.DEFAULT_DATABASE_NAME); } catch (NoSuchObjectException e) { - ms.createDatabase(new Database(MetaStoreUtils.DEFAULT_DATABASE_NAME, + getMS().createDatabase(new Database(MetaStoreUtils.DEFAULT_DATABASE_NAME, wh.getDefaultDatabasePath(MetaStoreUtils.DEFAULT_DATABASE_NAME).toString())); } - return true; + HMSHandler.createDefaultDB = true; } private Class<?> getClass(String rawStoreClassName, Class<RawStore> class1) throws MetaException { @@ -115,31 +157,46 @@ throw new MetaException(rawStoreClassName + " class not found"); } } + + private void logStartFunction(String m) { + LOG.info(threadLocalId.get().toString() + ": " + m); + } + private void logStartFunction(String f, String db, String tbl) { + LOG.info(threadLocalId.get().toString() + ": " + f + " : db=" + db + " tbl=" + tbl); + } + @Override public int getStatus() { return fb_status.ALIVE; } public void shutdown() { - LOG.info("Shutting down the object store..."); - ms.shutdown(); - super.shutdown(); + logStartFunction("Shutting down the object store..."); + try { + if(threadLocalMS.get() != null) { + getMS().shutdown(); + } + } catch (MetaException e) { + LOG.error("unable to shutdown metastore", e); + } + System.exit(0); } public boolean create_database(String name, String location_uri) throws AlreadyExistsException, MetaException { this.incrementCounter("create_database"); + logStartFunction("create_database: " + name); boolean success = false; try { - ms.openTransaction(); + getMS().openTransaction(); Database db = new Database(name, location_uri); - if(ms.createDatabase(db) && wh.mkdirs(wh.getDefaultDatabasePath(name))) { - success = ms.commitTransaction(); + if(getMS().createDatabase(db) && wh.mkdirs(wh.getDefaultDatabasePath(name))) { + success = getMS().commitTransaction(); } } finally { if(!success) { - ms.rollbackTransaction(); + getMS().rollbackTransaction(); } } return success; @@ -147,24 +204,25 @@ public Database get_database(String name) throws NoSuchObjectException, MetaException { this.incrementCounter("get_database"); - return ms.getDatabase(name); + logStartFunction("get_database: " + name); + return getMS().getDatabase(name); } public boolean drop_database(String name) throws MetaException { this.incrementCounter("drop_database"); + logStartFunction("drop_database: " + name); if(name.equalsIgnoreCase(MetaStoreUtils.DEFAULT_DATABASE_NAME)) { throw new MetaException("Can't drop default database"); } boolean success = false; try { - ms.openTransaction(); - success = ms.dropDatabase(name); - if(ms.dropDatabase(name)) { - success = ms.commitTransaction(); + getMS().openTransaction(); + if(getMS().dropDatabase(name)) { + success = getMS().commitTransaction(); } } finally { if(!success) { - ms.rollbackTransaction(); + getMS().rollbackTransaction(); } else { wh.deleteDir(wh.getDefaultDatabasePath(name), true); // it is not a terrible thing even if the data is not deleted @@ -175,45 +233,51 @@ public List<String> get_databases() throws MetaException { this.incrementCounter("get_databases"); - return ms.getDatabases(); + logStartFunction("get_databases"); + return getMS().getDatabases(); } public boolean create_type(Type type) throws AlreadyExistsException, MetaException, InvalidObjectException { this.incrementCounter("create_type"); + logStartFunction("create_type: " + type.getName()); // check whether type already exists if(get_type(type.getName()) != null) { throw new AlreadyExistsException("Type " + type.getName() + " already exists"); } //TODO:pc Validation of types should be done by clients or here???? - return ms.createType(type); + return getMS().createType(type); } public Type get_type(String name) throws MetaException { this.incrementCounter("get_type"); - return ms.getType(name); + logStartFunction("get_type: " + name); + return getMS().getType(name); } - public boolean drop_type(String type) throws MetaException { + public boolean drop_type(String name) throws MetaException { this.incrementCounter("drop_type"); + logStartFunction("drop_type: " + name); // TODO:pc validate that there are no types that refer to this - return ms.dropType(type); + return getMS().dropType(name); } public Map<String, Type> get_type_all(String name) throws MetaException { this.incrementCounter("get_type_all"); // TODO Auto-generated method stub - return null; + logStartFunction("get_type_all"); + throw new MetaException("Not yet implemented"); } public void create_table(Table tbl) throws AlreadyExistsException, MetaException, InvalidObjectException { this.incrementCounter("create_table"); + logStartFunction("create_table: db=" + tbl.getDatabase() + " tbl=" + tbl.getTableName()); boolean success = false; if(!MetaStoreUtils.validateName(tbl.getTableName())) { throw new InvalidObjectException(tbl.getTableName() + " is not a valid object name"); } try { - ms.openTransaction(); + getMS().openTransaction(); Path tblPath = null; if(tbl.getSd().getLocation() == null || tbl.getSd().getLocation().isEmpty()) { tblPath = wh.getDefaultTablePath(tbl.getDatabase(), tbl.getTableName()); @@ -228,24 +292,25 @@ } } catch (NoSuchObjectException e) { } - ms.createTable(tbl); + getMS().createTable(tbl); if(wh.mkdirs(tblPath)) { - success = ms.commitTransaction(); + success = getMS().commitTransaction(); } } finally { if(!success) { - ms.rollbackTransaction(); + getMS().rollbackTransaction(); } } } public void drop_table(String dbname, String name, boolean deleteData) throws NoSuchObjectException, MetaException { this.incrementCounter("drop_table"); + logStartFunction("drop_table", dbname, name); boolean success = false; Path tblPath = null; try { - ms.openTransaction(); + getMS().openTransaction(); // drop any partitions Table tbl = get_table(dbname, name); if (tbl == null) { @@ -254,14 +319,14 @@ if(tbl.getSd() == null || tbl.getSd().getLocation() == null) { throw new MetaException("Table metadata is corrupted"); } - if(!ms.dropTable(dbname, name)) { + if(!getMS().dropTable(dbname, name)) { throw new MetaException("Unable to drop table"); } - success = ms.commitTransaction(); + success = getMS().commitTransaction(); tblPath = new Path(tbl.getSd().getLocation()); } finally { if(!success) { - ms.rollbackTransaction(); + getMS().rollbackTransaction(); } else if(deleteData && (tblPath != null)) { wh.deleteDir(tblPath, true); // ok even if the data is not deleted @@ -271,7 +336,8 @@ public Table get_table(String dbname, String name) throws MetaException, NoSuchObjectException { this.incrementCounter("get_table"); - Table t = ms.getTable(dbname, name); + logStartFunction("get_table", dbname, name); + Table t = getMS().getTable(dbname, name); if(t == null) { throw new NoSuchObjectException(dbname + "." + name + " table not found"); } @@ -282,6 +348,7 @@ Map<String, String> params) throws NoSuchObjectException, MetaException { this.incrementCounter("set_table_parameters"); + logStartFunction("set_table_parameters", dbname, name); // TODO Auto-generated method stub return false; } @@ -289,16 +356,22 @@ public Partition append_partition(String dbName, String tableName, List<String> part_vals) throws InvalidObjectException, AlreadyExistsException, MetaException { this.incrementCounter("append_partition"); + logStartFunction("append_partition", dbName, tableName); Partition part = new Partition(); boolean success = false; try { - ms.openTransaction(); + getMS().openTransaction(); part = new Partition(); part.setDatabase(dbName); part.setTableName(tableName); part.setValues(part_vals); - Table tbl = ms.getTable(part.getDatabase(), part.getTableName()); + Partition old_part = this.get_partition(part.getDatabase(), part.getTableName(), part.getValues()); + if( old_part != null) { + throw new AlreadyExistsException("Partition already exists:" + part); + } + + Table tbl = getMS().getTable(part.getDatabase(), part.getTableName()); if(tbl == null) { throw new InvalidObjectException("Unable to add partition because table or database do not exist"); } @@ -306,16 +379,16 @@ Path partLocation = new Path(tbl.getSd().getLocation(), Warehouse.makePartName(tbl.getPartitionKeys(), part_vals)); part.getSd().setLocation(partLocation.toString()); - success = ms.addPartition(part); + success = getMS().addPartition(part); if(success) { - success = ms.commitTransaction(); + success = getMS().commitTransaction(); } } finally { if(!success) { - ms.rollbackTransaction(); + getMS().rollbackTransaction(); } else { Path path = new Path(part.getSd().getLocation()); - MetaStoreUtils.makeDir(path, hiveConf); + wh.mkdirs(path); } } return part; @@ -324,24 +397,29 @@ public Partition add_partition(Partition part) throws InvalidObjectException, AlreadyExistsException, MetaException { this.incrementCounter("add_partition"); + logStartFunction("add_partition", part.getDatabase(), part.getTableName()); boolean success = false; try { - ms.openTransaction(); - Table tbl = ms.getTable(part.getDatabase(), part.getTableName()); + getMS().openTransaction(); + Partition old_part = this.get_partition(part.getDatabase(), part.getTableName(), part.getValues()); + if( old_part != null) { + throw new AlreadyExistsException("Partition already exists:" + part); + } + Table tbl = getMS().getTable(part.getDatabase(), part.getTableName()); if(tbl == null) { throw new InvalidObjectException("Unable to add partition because table or database do not exist"); } // add partition - success = ms.addPartition(part); + success = getMS().addPartition(part); if(success) { - success = ms.commitTransaction(); + success = getMS().commitTransaction(); } } finally { if(!success) { - ms.rollbackTransaction(); + getMS().rollbackTransaction(); } else { Path path = new Path(part.getSd().getLocation()); - MetaStoreUtils.makeDir(path, hiveConf); + wh.mkdirs(path); } } return part; @@ -350,54 +428,62 @@ public boolean drop_partition(String db_name, String tbl_name, List<String> part_vals, boolean deleteData) throws NoSuchObjectException, MetaException, TException { this.incrementCounter("drop_partition"); + logStartFunction("drop_partition", db_name, tbl_name); // TODO:pc drop the data as needed - return ms.dropPartition(db_name, tbl_name, part_vals); + return getMS().dropPartition(db_name, tbl_name, part_vals); } public Partition get_partition(String db_name, String tbl_name, List<String> part_vals) throws MetaException { this.incrementCounter("get_partition"); - return ms.getPartition(db_name, tbl_name, part_vals); + logStartFunction("get_partition", db_name, tbl_name); + return getMS().getPartition(db_name, tbl_name, part_vals); } public List<Partition> get_partitions(String db_name, String tbl_name, short max_parts) throws NoSuchObjectException, MetaException { this.incrementCounter("get_partitions"); - return ms.getPartitions(db_name, tbl_name, max_parts); + logStartFunction("get_partitions", db_name, tbl_name); + return getMS().getPartitions(db_name, tbl_name, max_parts); } public boolean alter_partitions(StorageDescriptor sd, List<String> parts) throws InvalidOperationException, MetaException { this.incrementCounter("alter_partitions"); + logStartFunction("alter_partitions"); // TODO Auto-generated method stub - return false; + throw new MetaException("Not yet implemented"); } public boolean set_partition_parameters(String db_name, String tbl_name, String pname, Map<String, String> params) throws NoSuchObjectException, MetaException { this.incrementCounter("set_partition_parameters"); + logStartFunction("set_partition_parameters: db=" + db_name + " tbl=" + tbl_name); // TODO Auto-generated method stub - return false; + throw new MetaException("Not yet implemented"); } public boolean create_index(Index index_def) throws IndexAlreadyExistsException, MetaException { this.incrementCounter("create_index"); + logStartFunction("truncate_table: db=" + index_def.getTableName() + " tbl=" + index_def.getTableName() + " name=" + index_def.getIndexName()); // TODO Auto-generated method stub - return false; + throw new MetaException("Not yet implemented"); } public String getVersion() throws TException { this.incrementCounter("getVersion"); + logStartFunction("getVersion"); return "3.0"; } public void alter_table(String dbname, String name, Table newTable) throws InvalidOperationException, MetaException { this.incrementCounter("alter_table"); + logStartFunction("truncate_table: db=" + dbname + " tbl=" + name + " newtbl=" + newTable.getTableName()); try { - ms.alterTable(dbname, name, newTable); + getMS().alterTable(dbname, name, newTable); } catch (InvalidObjectException e) { LOG.error(StringUtils.stringifyException(e)); throw new InvalidOperationException("alter is not possible"); @@ -408,14 +494,16 @@ public List<String> cat(String db_name, String table_name, String partition, int high) throws MetaException, UnknownDBException, UnknownTableException { this.incrementCounter("cat"); + logStartFunction("cat: db=" + db_name + " tbl=" + table_name + " part=" + partition + " high=" + high); // TODO Auto-generated method stub - return null; + throw new MetaException("Not implemented. Please use select * query instead"); } @Override public List<String> get_tables(String dbname, String pattern) throws MetaException { this.incrementCounter("get_tables"); - return ms.getTables(dbname, pattern); + logStartFunction("get_tables: db=" + dbname + " pat=" + pattern); + return getMS().getTables(dbname, pattern); } @Override @@ -423,6 +511,7 @@ throws MetaException, UnknownTableException, UnknownDBException { // TODO Auto-generated method stub this.incrementCounter("truncate_table"); + logStartFunction("truncate_table: db=" + db_name + " tbl=" + table_name); } /** @@ -479,10 +568,11 @@ return ret; } - public ArrayList<FieldSchema> get_fields(String db, String table_name) throws MetaException,UnknownTableException, UnknownDBException { + public ArrayList<FieldSchema> get_fields(String db, String tableName) throws MetaException,UnknownTableException, UnknownDBException { this.incrementCounter("get_fields"); + logStartFunction("get_fields: db=" + db + "tbl=" + tableName); ArrayList<FieldSchema> str_fields = new ArrayList<FieldSchema>(); - String [] names = table_name.split("\\."); + String [] names = tableName.split("\\."); String base_table_name = names[0]; List<SerDeField> hive_fields = new ArrayList<SerDeField>(); @@ -531,7 +621,7 @@ } } } - + /** * @param args */ @@ -545,15 +635,18 @@ TServerTransport serverTransport = new TServerSocket(port); Iface handler = new HMSHandler("new db based metaserver"); FacebookService.Processor processor = new ThriftHiveMetastore.Processor(handler); - TServer server = new TThreadPoolServer(processor, serverTransport); - HMSHandler.LOG.info("Starting the new metaserver on port [" + port + "]..."); - + TThreadPoolServer.Options options = new TThreadPoolServer.Options(); + options.minWorkerThreads = 200; + TServer server = new TThreadPoolServer(processor, serverTransport, + new TTransportFactory(), new TTransportFactory(), + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), options); + HMSHandler.LOG.info("Started the new metaserver on port [" + port + "]..."); + HMSHandler.LOG.info("Options.minWorkerThreads = " + options.minWorkerThreads); + HMSHandler.LOG.info("Options.maxWorkerThreads = " + options.maxWorkerThreads); server.serve(); - } catch (Exception x) { x.printStackTrace(); } } - }
Modified: hadoop/core/trunk/src/contrib/hive/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=696427&r1=696426&r2=696427&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/hive/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (original) +++ hadoop/core/trunk/src/contrib/hive/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java Wed Sep 17 13:13:00 2008 @@ -34,6 +34,7 @@ import javax.jdo.PersistenceManagerFactory; import javax.jdo.Query; import javax.jdo.Transaction; +import javax.jdo.datastore.DataStoreCache; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -70,18 +71,26 @@ public class ObjectStore implements RawStore, Configurable { @SuppressWarnings("nls") private static final String JPOX_CONFIG = "jpox.properties"; - private static Properties prop; + private static Properties prop = null; + private static PersistenceManagerFactory pmf = null; + private static final Log LOG = LogFactory.getLog(ObjectStore.class.getName()); + private static enum TXN_STATUS { + NO_STATE, + OPEN, + COMMITED, + ROLLBACK + } private boolean isInitialized = false; private PersistenceManager pm = null; private Configuration hiveConf; - @SuppressWarnings("nls") - public static final Log LOG = LogFactory.getLog(ObjectStore.class.getName()); + private int openTrasactionCalls = 0; + private Transaction currentTransaction = null; + private TXN_STATUS transactionStatus = TXN_STATUS.NO_STATE; public ObjectStore() {} @Override public Configuration getConf() { - // TODO Auto-generated method stub return hiveConf; } @@ -112,8 +121,8 @@ @SuppressWarnings("nls") private void initialize() { LOG.info("ObjectStore, initialize called"); - getDataSourceProps(); - pm = createPersistenceManager(); + initDataSourceProps(); + pm = getPersistenceManager(); if(pm != null) isInitialized = true; return; @@ -124,7 +133,10 @@ * jpox.properties. */ @SuppressWarnings("nls") - private void getDataSourceProps() { + private void initDataSourceProps() { + if(prop != null) { + return; + } URL url= classLoader.getResource(JPOX_CONFIG); prop = new Properties(); if (url == null) { @@ -167,10 +179,26 @@ prop.setProperty(param, val); } } - - private static PersistenceManager createPersistenceManager() { - PersistenceManagerFactory pmf = JDOHelper.getPersistenceManagerFactory(prop); - return pmf.getPersistenceManager(); + private static PersistenceManagerFactory getPMF() { + if(pmf == null) { + pmf = JDOHelper.getPersistenceManagerFactory(prop); + DataStoreCache dsc = pmf.getDataStoreCache(); + if(dsc != null) { + dsc.pinAll(true, MTable.class); + dsc.pinAll(true, MStorageDescriptor.class); + dsc.pinAll(true, MSerDeInfo.class); + dsc.pinAll(true, MPartition.class); + dsc.pinAll(true, MDatabase.class); + dsc.pinAll(true, MType.class); + dsc.pinAll(true, MFieldSchema.class); + dsc.pinAll(true, MOrder.class); + } + } + return pmf; + } + + private PersistenceManager getPersistenceManager() { + return getPMF().getPersistenceManager(); } public void shutdown() { @@ -179,16 +207,6 @@ } } - private int openTrasactionCalls = 0; - private Transaction currentTransaction = null; - private static enum TXN_STATUS { - NO_STATE, - OPEN, - COMMITED, - ROLLBACK - } - private TXN_STATUS transactionStatus = TXN_STATUS.NO_STATE; - /** * Opens a new one or the one already created * Every call of this function must have corresponding commit or rollback function call @@ -621,7 +639,7 @@ private List<Order> convertToOrders(List<MOrder> mkeys) { List<Order> keys = null; if(mkeys != null) { - keys = new ArrayList<Order>(mkeys.size()); + keys = new ArrayList<Order>(); for (MOrder part : mkeys) { keys.add(new Order(part.getCol(), part.getOrder())); } @@ -817,8 +835,8 @@ query.declareParameters("java.lang.String t1, java.lang.String t2"); mparts = (List<MPartition>) query.execute(tableName.trim(), dbName.trim()); pm.retrieveAll(mparts); - success = commitTransaction(); + LOG.debug("Done e xecuting listMPartitions"); } finally { if(!success) { rollbackTransaction();
