http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
new file mode 100644
index 0000000..ffb2abd
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -0,0 +1,9313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import static org.apache.commons.lang.StringUtils.join;
+import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+import javax.jdo.JDOCanRetryException;
+import javax.jdo.JDODataStoreException;
+import javax.jdo.JDOException;
+import javax.jdo.JDOHelper;
+import javax.jdo.JDOObjectNotFoundException;
+import javax.jdo.PersistenceManager;
+import javax.jdo.PersistenceManagerFactory;
+import javax.jdo.Query;
+import javax.jdo.Transaction;
+import javax.jdo.datastore.DataStoreCache;
+import javax.jdo.datastore.JDOConnection;
+import javax.jdo.identity.IntIdentity;
+import javax.sql.DataSource;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql.SqlFilterForPushdown;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
+import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionEventType;
+import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse;
+import org.apache.hadoop.hive.metastore.api.PartitionValuesRow;
+import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
+import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo;
+import org.apache.hadoop.hive.metastore.api.ResourceType;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
+import org.apache.hadoop.hive.metastore.api.Role;
+import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.api.Type;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
+import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.model.MColumnDescriptor;
+import org.apache.hadoop.hive.metastore.model.MConstraint;
+import org.apache.hadoop.hive.metastore.model.MDBPrivilege;
+import org.apache.hadoop.hive.metastore.model.MDatabase;
+import org.apache.hadoop.hive.metastore.model.MDelegationToken;
+import org.apache.hadoop.hive.metastore.model.MFieldSchema;
+import org.apache.hadoop.hive.metastore.model.MFunction;
+import org.apache.hadoop.hive.metastore.model.MGlobalPrivilege;
+import org.apache.hadoop.hive.metastore.model.MIndex;
+import org.apache.hadoop.hive.metastore.model.MMasterKey;
+import org.apache.hadoop.hive.metastore.model.MNotificationLog;
+import org.apache.hadoop.hive.metastore.model.MNotificationNextId;
+import org.apache.hadoop.hive.metastore.model.MOrder;
+import org.apache.hadoop.hive.metastore.model.MPartition;
+import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege;
+import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
+import org.apache.hadoop.hive.metastore.model.MPartitionEvent;
+import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege;
+import org.apache.hadoop.hive.metastore.model.MResourceUri;
+import org.apache.hadoop.hive.metastore.model.MRole;
+import org.apache.hadoop.hive.metastore.model.MRoleMap;
+import org.apache.hadoop.hive.metastore.model.MSerDeInfo;
+import org.apache.hadoop.hive.metastore.model.MStorageDescriptor;
+import org.apache.hadoop.hive.metastore.model.MStringList;
+import org.apache.hadoop.hive.metastore.model.MTable;
+import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege;
+import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics;
+import org.apache.hadoop.hive.metastore.model.MTablePrivilege;
+import org.apache.hadoop.hive.metastore.model.MType;
+import org.apache.hadoop.hive.metastore.model.MVersionTable;
+import org.apache.hadoop.hive.metastore.model.MMetastoreDBProperties;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree.FilterBuilder;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.ObjectPair;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.thrift.TException;
+import org.datanucleus.AbstractNucleusContext;
+import org.datanucleus.ClassLoaderResolver;
+import org.datanucleus.ClassLoaderResolverImpl;
+import org.datanucleus.NucleusContext;
+import org.datanucleus.api.jdo.JDOPersistenceManager;
+import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
+import org.datanucleus.store.rdbms.exceptions.MissingTableException;
+import org.datanucleus.store.scostore.Store;
+import org.datanucleus.util.WeakValueMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+
+/**
+ * This class is the interface between the application logic and the database
+ * store that contains the objects. Refrain putting any logic in mode.M* 
objects
+ * or in this file as former could be auto generated and this class would need
+ * to be made into a interface that can read both from a database and a
+ * filestore.
+ */
+public class ObjectStore implements RawStore, Configurable {
+  private static Properties prop = null;
+  private static PersistenceManagerFactory pmf = null;
+  private static boolean forTwoMetastoreTesting = false;
+
+  private static Lock pmfPropLock = new ReentrantLock();
+  /**
+  * Verify the schema only once per JVM since the db connection info is static
+  */
+  private final static AtomicBoolean isSchemaVerified = new 
AtomicBoolean(false);
+  private static final Logger LOG = 
LoggerFactory.getLogger(ObjectStore.class.getName());
+
+  private enum TXN_STATUS {
+    NO_STATE, OPEN, COMMITED, ROLLBACK
+  }
+
+  private static final Map<String, Class> PINCLASSMAP;
+  private static final String HOSTNAME;
+  private static final String USER;
+  private static final String JDO_PARAM = ":param";
+  static {
+    Map<String, Class> map = new HashMap<>();
+    map.put("table", MTable.class);
+    map.put("storagedescriptor", MStorageDescriptor.class);
+    map.put("serdeinfo", MSerDeInfo.class);
+    map.put("partition", MPartition.class);
+    map.put("database", MDatabase.class);
+    map.put("type", MType.class);
+    map.put("fieldschema", MFieldSchema.class);
+    map.put("order", MOrder.class);
+    PINCLASSMAP = Collections.unmodifiableMap(map);
+    String hostname = "UNKNOWN";
+    try {
+      InetAddress clientAddr = InetAddress.getLocalHost();
+      hostname = clientAddr.getHostAddress();
+    } catch (IOException e) {
+    }
+    HOSTNAME = hostname;
+    String user = System.getenv("USER");
+    if (user == null) {
+      USER = "UNKNOWN";
+    } else {
+      USER = user;
+    }
+  }
+
+
+  private boolean isInitialized = false;
+  private PersistenceManager pm = null;
+  private SQLGenerator sqlGenerator = null;
+  private MetaStoreDirectSql directSql = null;
+  private DatabaseProduct dbType = null;
+  private PartitionExpressionProxy expressionProxy = null;
+  private Configuration conf;
+  private volatile int openTrasactionCalls = 0;
+  private Transaction currentTransaction = null;
+  private TXN_STATUS transactionStatus = TXN_STATUS.NO_STATE;
+  private Pattern partitionValidationPattern;
+  private Counter directSqlErrors;
+
+  /**
+   * A Autocloseable wrapper around Query class to pass the Query object to 
the caller and let the caller release
+   * the resources when the QueryWrapper goes out of scope
+   */
+  public static class QueryWrapper implements AutoCloseable {
+    public Query query;
+
+    /**
+     * Explicitly closes the query object to release the resources
+     */
+    @Override
+    public void close() {
+      if (query != null) {
+        query.closeAll();
+        query = null;
+      }
+    }
+  }
+
+  public ObjectStore() {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Called whenever this object is instantiated using ReflectionUtils, and 
also
+   * on connection retries. In cases of connection retries, conf will usually
+   * contain modified values.
+   */
+  @Override
+  @SuppressWarnings("nls")
+  public void setConf(Configuration conf) {
+    // Although an instance of ObjectStore is accessed by one thread, there may
+    // be many threads with ObjectStore instances. So the static variables
+    // pmf and prop need to be protected with locks.
+    pmfPropLock.lock();
+    try {
+      isInitialized = false;
+      this.conf = conf;
+      configureSSL(conf);
+      Properties propsFromConf = getDataSourceProps(conf);
+      boolean propsChanged = !propsFromConf.equals(prop);
+
+      if (propsChanged) {
+        if (pmf != null){
+          clearOutPmfClassLoaderCache(pmf);
+          if (!forTwoMetastoreTesting) {
+            // close the underlying connection pool to avoid leaks
+            pmf.close();
+          }
+        }
+        pmf = null;
+        prop = null;
+      }
+
+      assert(!isActiveTransaction());
+      shutdown();
+      // Always want to re-create pm as we don't know if it were created by the
+      // most recent instance of the pmf
+      pm = null;
+      directSql = null;
+      expressionProxy = null;
+      openTrasactionCalls = 0;
+      currentTransaction = null;
+      transactionStatus = TXN_STATUS.NO_STATE;
+
+      initialize(propsFromConf);
+
+      String partitionValidationRegex =
+          MetastoreConf.getVar(this.conf, 
ConfVars.PARTITION_NAME_WHITELIST_PATTERN);
+      if (partitionValidationRegex != null && 
!partitionValidationRegex.isEmpty()) {
+        partitionValidationPattern = Pattern.compile(partitionValidationRegex);
+      } else {
+        partitionValidationPattern = null;
+      }
+
+      // Note, if metrics have not been initialized this will return null, 
which means we aren't
+      // using metrics.  Thus we should always check whether this is non-null 
before using.
+      MetricRegistry registry = Metrics.getRegistry();
+      if (registry != null) {
+        directSqlErrors = 
Metrics.getOrCreateCounter(MetricsConstants.DIRECTSQL_ERRORS);
+      }
+
+      if (!isInitialized) {
+        throw new RuntimeException(
+        "Unable to create persistence manager. Check dss.log for details");
+      } else {
+        LOG.info("Initialized ObjectStore");
+      }
+    } finally {
+      pmfPropLock.unlock();
+    }
+  }
+
+  private ClassLoader classLoader;
+  {
+    classLoader = Thread.currentThread().getContextClassLoader();
+    if (classLoader == null) {
+      classLoader = ObjectStore.class.getClassLoader();
+    }
+  }
+
+  @SuppressWarnings("nls")
+  private void initialize(Properties dsProps) {
+    int retryLimit = MetastoreConf.getIntVar(conf, 
ConfVars.HMSHANDLERATTEMPTS);
+    long retryInterval = MetastoreConf.getTimeVar(conf,
+        ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS);
+    int numTries = retryLimit;
+
+    while (numTries > 0){
+      try {
+        initializeHelper(dsProps);
+        return; // If we reach here, we succeed.
+      } catch (Exception e){
+        numTries--;
+        boolean retriable = isRetriableException(e);
+        if ((numTries > 0) && retriable){
+          LOG.info("Retriable exception while instantiating ObjectStore, 
retrying. "
+              + numTries + " tries left", e);
+          try {
+            Thread.sleep(retryInterval);
+          } catch (InterruptedException ie) {
+            // Restore the interrupted status, since we do not want to catch 
it.
+            LOG.debug("Interrupted while sleeping before retrying.",ie);
+            Thread.currentThread().interrupt();
+          }
+          // If we're here, we'll proceed down the next while loop iteration.
+        } else {
+          // we've reached our limit, throw the last one.
+          if (retriable){
+            LOG.warn("Exception retry limit reached, not retrying any longer.",
+              e);
+          } else {
+            LOG.debug("Non-retriable exception during ObjectStore 
initialize.", e);
+          }
+          throw e;
+        }
+      }
+    }
+  }
+
+  private static final Set<Class<? extends Throwable>> 
retriableExceptionClasses =
+      new HashSet<>(Arrays.asList(JDOCanRetryException.class));
+  /**
+   * Helper function for initialize to determine if we should retry an 
exception.
+   * We return true if the exception is of a known type of retriable 
exceptions, or if one
+   * of its recursive .getCause returns a known type of retriable exception.
+   */
+  private boolean isRetriableException(Throwable e) {
+    if (e == null){
+      return false;
+    }
+    if (retriableExceptionClasses.contains(e.getClass())){
+      return true;
+    }
+    for (Class<? extends Throwable> c : retriableExceptionClasses){
+      if (c.isInstance(e)){
+        return true;
+      }
+    }
+
+    if (e.getCause() == null){
+      return false;
+    }
+    return isRetriableException(e.getCause());
+  }
+
+  /**
+   * private helper to do initialization routine, so we can retry if needed if 
it fails.
+   * @param dsProps
+   */
+  private void initializeHelper(Properties dsProps) {
+    LOG.info("ObjectStore, initialize called");
+    prop = dsProps;
+    pm = getPersistenceManager();
+    try {
+      String productName = MetaStoreDirectSql.getProductName(pm);
+      sqlGenerator = new 
SQLGenerator(DatabaseProduct.determineDatabaseProduct(productName), conf);
+    } catch (SQLException e) {
+      LOG.error("error trying to figure out the database product", e);
+      throw new RuntimeException(e);
+    }
+    isInitialized = pm != null;
+    if (isInitialized) {
+      dbType = determineDatabaseProduct();
+      expressionProxy = createExpressionProxy(conf);
+      if (MetastoreConf.getBoolVar(getConf(), ConfVars.TRY_DIRECT_SQL)) {
+        String schema = prop.getProperty("javax.jdo.mapping.Schema");
+        if (schema != null && schema.isEmpty()) {
+          schema = null;
+        }
+        directSql = new MetaStoreDirectSql(pm, conf, schema);
+      }
+    }
+    LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm +
+        " created in the thread with id: " + Thread.currentThread().getId());
+  }
+
+  private DatabaseProduct determineDatabaseProduct() {
+    try {
+      return DatabaseProduct.determineDatabaseProduct(getProductName(pm));
+    } catch (SQLException e) {
+      LOG.warn("Cannot determine database product; assuming OTHER", e);
+      return DatabaseProduct.OTHER;
+    }
+  }
+
+  private static String getProductName(PersistenceManager pm) {
+    JDOConnection jdoConn = pm.getDataStoreConnection();
+    try {
+      return 
((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName();
+    } catch (Throwable t) {
+      LOG.warn("Error retrieving product name", t);
+      return null;
+    } finally {
+      jdoConn.close(); // We must release the connection before we call other 
pm methods.
+    }
+  }
+
+  /**
+   * Creates the proxy used to evaluate expressions. This is here to prevent 
circular
+   * dependency - ql -&gt; metastore client &lt;-&gt metastore server -&gt ql. 
If server and
+   * client are split, this can be removed.
+   * @param conf Configuration.
+   * @return The partition expression proxy.
+   */
+  private static PartitionExpressionProxy createExpressionProxy(Configuration 
conf) {
+    String className = MetastoreConf.getVar(conf, 
ConfVars.EXPRESSION_PROXY_CLASS);
+    try {
+      @SuppressWarnings("unchecked")
+      Class<? extends PartitionExpressionProxy> clazz =
+           JavaUtils.getClass(className, PartitionExpressionProxy.class);
+      return JavaUtils.newInstance(clazz, new Class<?>[0], new Object[0]);
+    } catch (MetaException e) {
+      LOG.error("Error loading PartitionExpressionProxy", e);
+      throw new RuntimeException("Error loading PartitionExpressionProxy: " + 
e.getMessage());
+    }
+  }
+
+  /**
+   * Configure the SSL properties of the connection from provided config
+   * @param conf
+   */
+  private static void configureSSL(Configuration conf) {
+    // SSL support
+    String sslPropString = MetastoreConf.getVar(conf, 
ConfVars.DBACCESS_SSL_PROPS);
+    if (org.apache.commons.lang.StringUtils.isNotEmpty(sslPropString)) {
+      LOG.info("Metastore setting SSL properties of the connection to backed 
DB");
+      for (String sslProp : sslPropString.split(",")) {
+        String[] pair = sslProp.trim().split("=");
+        if (pair != null && pair.length == 2) {
+          System.setProperty(pair[0].trim(), pair[1].trim());
+        } else {
+          LOG.warn("Invalid metastore property value for " + 
ConfVars.DBACCESS_SSL_PROPS);
+        }
+      }
+    }
+  }
+
+  /**
+   * Properties specified in hive-default.xml override the properties specified
+   * in jpox.properties.
+   */
+  @SuppressWarnings("nls")
+  private static Properties getDataSourceProps(Configuration conf) {
+    Properties prop = new Properties();
+    correctAutoStartMechanism(conf);
+
+    // First, go through and set all our values for datanucleus and javax.jdo 
parameters.  This
+    // has to be a separate first step because we don't set the default values 
in the config object.
+    for (ConfVars var : MetastoreConf.dataNucleusAndJdoConfs) {
+      String confVal = MetastoreConf.getAsString(conf, var);
+      Object prevVal = prop.setProperty(var.varname, confVal);
+      if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(var.varname)) {
+        LOG.debug("Overriding " + var.varname + " value " + prevVal
+            + " from  jpox.properties with " + confVal);
+      }
+    }
+
+    // Now, we need to look for any values that the user set that 
MetastoreConf doesn't know about.
+    // TODO Commenting this out for now, as it breaks because the conf values 
aren't getting properly
+    // interpolated in case of variables.  See HIVE-17788.
+    /*
+    for (Map.Entry<String, String> e : conf) {
+      if (e.getKey().startsWith("datanucleus.") || 
e.getKey().startsWith("javax.jdo.")) {
+        // We have to handle this differently depending on whether it is a 
value known to
+        // MetastoreConf or not.  If it is, we need to get the default value 
if a value isn't
+        // provided.  If not, we just set whatever the user has set.
+        Object prevVal = prop.setProperty(e.getKey(), e.getValue());
+        if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(e.getKey())) {
+          LOG.debug("Overriding " + e.getKey() + " value " + prevVal
+              + " from  jpox.properties with " + e.getValue());
+        }
+      }
+    }
+    */
+
+    // Password may no longer be in the conf, use getPassword()
+    try {
+      String passwd = MetastoreConf.getPassword(conf, 
MetastoreConf.ConfVars.PWD);
+      if (passwd != null && !passwd.isEmpty()) {
+        // We can get away with the use of varname here because varname == 
hiveName for PWD
+        prop.setProperty(ConfVars.PWD.varname, passwd);
+      }
+    } catch (IOException err) {
+      throw new RuntimeException("Error getting metastore password: " + 
err.getMessage(), err);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      for (Entry<Object, Object> e : prop.entrySet()) {
+        if (MetastoreConf.isPrintable(e.getKey().toString())) {
+          LOG.debug(e.getKey() + " = " + e.getValue());
+        }
+      }
+    }
+
+    return prop;
+  }
+
+  /**
+   * Update conf to set datanucleus.autoStartMechanismMode=ignored.
+   * This is necessary to able to use older version of hive against
+   * an upgraded but compatible metastore schema in db from new version
+   * of hive
+   * @param conf
+   */
+  private static void correctAutoStartMechanism(Configuration conf) {
+    final String autoStartKey = "datanucleus.autoStartMechanismMode";
+    final String autoStartIgnore = "ignored";
+    String currentAutoStartVal = conf.get(autoStartKey);
+    if(currentAutoStartVal != null && 
!currentAutoStartVal.equalsIgnoreCase(autoStartIgnore)) {
+      LOG.warn(autoStartKey + " is set to unsupported value " + 
conf.get(autoStartKey) +
+          " . Setting it to value " + autoStartIgnore);
+    }
+    conf.set(autoStartKey, autoStartIgnore);
+  }
+
+  private static synchronized PersistenceManagerFactory getPMF() {
+    if (pmf == null) {
+
+      Configuration conf = MetastoreConf.newMetastoreConf();
+      DataSourceProvider dsp = 
DataSourceProviderFactory.getDataSourceProvider(conf);
+      if (dsp == null) {
+        pmf = JDOHelper.getPersistenceManagerFactory(prop);
+      } else {
+        try {
+          DataSource ds = dsp.create(conf);
+          Map<Object, Object> dsProperties = new HashMap<>();
+          //Any preexisting datanucleus property should be passed along
+          dsProperties.putAll(prop);
+          dsProperties.put("datanucleus.ConnectionFactory", ds);
+          dsProperties.put("javax.jdo.PersistenceManagerFactoryClass",
+              "org.datanucleus.api.jdo.JDOPersistenceManagerFactory");
+          pmf = JDOHelper.getPersistenceManagerFactory(dsProperties);
+        } catch (SQLException e) {
+          LOG.warn("Could not create PersistenceManagerFactory using " +
+              "connection pool properties, will fall back", e);
+          pmf = JDOHelper.getPersistenceManagerFactory(prop);
+        }
+      }
+      DataStoreCache dsc = pmf.getDataStoreCache();
+      if (dsc != null) {
+        String objTypes = MetastoreConf.getVar(conf, 
ConfVars.CACHE_PINOBJTYPES);
+        LOG.info("Setting MetaStore object pin classes with 
hive.metastore.cache.pinobjtypes=\"" + objTypes + "\"");
+        if (objTypes != null && objTypes.length() > 0) {
+          objTypes = objTypes.toLowerCase();
+          String[] typeTokens = objTypes.split(",");
+          for (String type : typeTokens) {
+            type = type.trim();
+            if (PINCLASSMAP.containsKey(type)) {
+              dsc.pinAll(true, PINCLASSMAP.get(type));
+            }
+            else {
+              LOG.warn(type + " is not one of the pinnable object types: " + 
org.apache.commons.lang.StringUtils.join(PINCLASSMAP.keySet(), " "));
+            }
+          }
+        }
+      } else {
+        LOG.warn("PersistenceManagerFactory returned null DataStoreCache 
object. Unable to initialize object pin types defined by 
hive.metastore.cache.pinobjtypes");
+      }
+    }
+    return pmf;
+  }
+
+  @InterfaceAudience.LimitedPrivate({"HCATALOG"})
+  @InterfaceStability.Evolving
+  public PersistenceManager getPersistenceManager() {
+    return getPMF().getPersistenceManager();
+  }
+
+  @Override
+  public void shutdown() {
+    if (pm != null) {
+      LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm +
+          " will be shutdown");
+      pm.close();
+      pm = null;
+    }
+  }
+
+  /**
+   * Opens a new one or the one already created Every call of this function 
must
+   * have corresponding commit or rollback function call
+   *
+   * @return an active transaction
+   */
+
+  @Override
+  public boolean openTransaction() {
+    openTrasactionCalls++;
+    if (openTrasactionCalls == 1) {
+      currentTransaction = pm.currentTransaction();
+      currentTransaction.begin();
+      transactionStatus = TXN_STATUS.OPEN;
+    } else {
+      // openTransactionCalls > 1 means this is an interior transaction
+      // We should already have a transaction created that is active.
+      if ((currentTransaction == null) || (!currentTransaction.isActive())){
+        throw new RuntimeException("openTransaction called in an interior"
+            + " transaction scope, but currentTransaction is not active.");
+      }
+    }
+
+    boolean result = currentTransaction.isActive();
+    debugLog("Open transaction: count = " + openTrasactionCalls + ", isActive 
= " + result);
+    return result;
+  }
+
+  /**
+   * if this is the commit of the first open call then an actual commit is
+   * called.
+   *
+   * @return Always returns true
+   */
+  @Override
+  @SuppressWarnings("nls")
+  public boolean commitTransaction() {
+    if (TXN_STATUS.ROLLBACK == transactionStatus) {
+      debugLog("Commit transaction: rollback");
+      return false;
+    }
+    if (openTrasactionCalls <= 0) {
+      RuntimeException e = new RuntimeException("commitTransaction was called 
but openTransactionCalls = "
+          + openTrasactionCalls + ". This probably indicates that there are 
unbalanced " +
+          "calls to openTransaction/commitTransaction");
+      LOG.error("Unbalanced calls to open/commit Transaction", e);
+      throw e;
+    }
+    if (!currentTransaction.isActive()) {
+      RuntimeException e = new RuntimeException("commitTransaction was called 
but openTransactionCalls = "
+          + openTrasactionCalls + ". This probably indicates that there are 
unbalanced " +
+          "calls to openTransaction/commitTransaction");
+      LOG.error("Unbalanced calls to open/commit Transaction", e);
+      throw e;
+    }
+    openTrasactionCalls--;
+    debugLog("Commit transaction: count = " + openTrasactionCalls + ", 
isactive "+ currentTransaction.isActive());
+
+    if ((openTrasactionCalls == 0) && currentTransaction.isActive()) {
+      transactionStatus = TXN_STATUS.COMMITED;
+      currentTransaction.commit();
+    }
+    return true;
+  }
+
+  /**
+   * @return true if there is an active transaction. If the current transaction
+   *         is either committed or rolled back it returns false
+   */
+  public boolean isActiveTransaction() {
+    if (currentTransaction == null) {
+      return false;
+    }
+    return currentTransaction.isActive();
+  }
+
+  /**
+   * Rolls back the current transaction if it is active
+   */
+  @Override
+  public void rollbackTransaction() {
+    if (openTrasactionCalls < 1) {
+      debugLog("rolling back transaction: no open transactions: " + 
openTrasactionCalls);
+      return;
+    }
+    debugLog("Rollback transaction, isActive: " + 
currentTransaction.isActive());
+    try {
+      if (currentTransaction.isActive()
+          && transactionStatus != TXN_STATUS.ROLLBACK) {
+        currentTransaction.rollback();
+      }
+    } finally {
+      openTrasactionCalls = 0;
+      transactionStatus = TXN_STATUS.ROLLBACK;
+      // remove all detached objects from the cache, since the transaction is
+      // being rolled back they are no longer relevant, and this prevents them
+      // from reattaching in future transactions
+      pm.evictAll();
+    }
+  }
+
+  @Override
+  public void createDatabase(Database db) throws InvalidObjectException, 
MetaException {
+    boolean commited = false;
+    MDatabase mdb = new MDatabase();
+    mdb.setName(db.getName().toLowerCase());
+    mdb.setLocationUri(db.getLocationUri());
+    mdb.setDescription(db.getDescription());
+    mdb.setParameters(db.getParameters());
+    mdb.setOwnerName(db.getOwnerName());
+    PrincipalType ownerType = db.getOwnerType();
+    mdb.setOwnerType((null == ownerType ? PrincipalType.USER.name() : 
ownerType.name()));
+    try {
+      openTransaction();
+      pm.makePersistent(mdb);
+      commited = commitTransaction();
+    } finally {
+      if (!commited) {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  @SuppressWarnings("nls")
+  private MDatabase getMDatabase(String name) throws NoSuchObjectException {
+    MDatabase mdb = null;
+    boolean commited = false;
+    Query query = null;
+    try {
+      openTransaction();
+      name = normalizeIdentifier(name);
+      query = pm.newQuery(MDatabase.class, "name == dbname");
+      query.declareParameters("java.lang.String dbname");
+      query.setUnique(true);
+      mdb = (MDatabase) query.execute(name);
+      pm.retrieve(mdb);
+      commited = commitTransaction();
+    } finally {
+      rollbackAndCleanup(commited, query);
+    }
+    if (mdb == null) {
+      throw new NoSuchObjectException("There is no database named " + name);
+    }
+    return mdb;
+  }
+
+  @Override
+  public Database getDatabase(String name) throws NoSuchObjectException {
+    MetaException ex = null;
+    Database db = null;
+    try {
+      db = getDatabaseInternal(name);
+    } catch (MetaException e) {
+      // Signature restriction to NSOE, and NSOE being a flat exception 
prevents us from
+      // setting the cause of the NSOE as the MetaException. We should not 
lose the info
+      // we got here, but it's very likely that the MetaException is 
irrelevant and is
+      // actually an NSOE message, so we should log it and throw an NSOE with 
the msg.
+      ex = e;
+    }
+    if (db == null) {
+      LOG.warn("Failed to get database " + name +", returning 
NoSuchObjectException", ex);
+      throw new NoSuchObjectException(name + (ex == null ? "" : (": " + 
ex.getMessage())));
+    }
+    return db;
+  }
+
+  public Database getDatabaseInternal(String name) throws MetaException, 
NoSuchObjectException {
+    return new GetDbHelper(name, true, true) {
+      @Override
+      protected Database getSqlResult(GetHelper<Database> ctx) throws 
MetaException {
+        return directSql.getDatabase(dbName);
+      }
+
+      @Override
+      protected Database getJdoResult(GetHelper<Database> ctx) throws 
MetaException, NoSuchObjectException {
+        return getJDODatabase(dbName);
+      }
+    }.run(false);
+   }
+
+  public Database getJDODatabase(String name) throws NoSuchObjectException {
+    MDatabase mdb = null;
+    boolean commited = false;
+    try {
+      openTransaction();
+      mdb = getMDatabase(name);
+      commited = commitTransaction();
+    } finally {
+      if (!commited) {
+        rollbackTransaction();
+      }
+    }
+    Database db = new Database();
+    db.setName(mdb.getName());
+    db.setDescription(mdb.getDescription());
+    db.setLocationUri(mdb.getLocationUri());
+    db.setParameters(convertMap(mdb.getParameters()));
+    db.setOwnerName(mdb.getOwnerName());
+    String type = mdb.getOwnerType();
+    db.setOwnerType((null == type || type.trim().isEmpty()) ? null : 
PrincipalType.valueOf(type));
+    return db;
+  }
+
+  /**
+   * Alter the database object in metastore. Currently only the parameters
+   * of the database or the owner can be changed.
+   * @param dbName the database name
+   * @param db the Hive Database object
+   * @throws MetaException
+   * @throws NoSuchObjectException
+   */
+  @Override
+  public boolean alterDatabase(String dbName, Database db)
+    throws MetaException, NoSuchObjectException {
+
+    MDatabase mdb = null;
+    boolean committed = false;
+    try {
+      mdb = getMDatabase(dbName);
+      mdb.setParameters(db.getParameters());
+      mdb.setOwnerName(db.getOwnerName());
+      if (db.getOwnerType() != null) {
+        mdb.setOwnerType(db.getOwnerType().name());
+      }
+      if (org.apache.commons.lang.StringUtils.isNotBlank(db.getDescription())) 
{
+        mdb.setDescription(db.getDescription());
+      }
+      if (org.apache.commons.lang.StringUtils.isNotBlank(db.getLocationUri())) 
{
+        mdb.setLocationUri(db.getLocationUri());
+      }
+      openTransaction();
+      pm.makePersistent(mdb);
+      committed = commitTransaction();
+    } finally {
+      if (!committed) {
+        rollbackTransaction();
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean dropDatabase(String dbname) throws NoSuchObjectException, 
MetaException {
+    boolean success = false;
+    LOG.info("Dropping database " + dbname + " along with all tables");
+    dbname = normalizeIdentifier(dbname);
+    QueryWrapper queryWrapper = new QueryWrapper();
+    try {
+      openTransaction();
+
+      // then drop the database
+      MDatabase db = getMDatabase(dbname);
+      pm.retrieve(db);
+      if (db != null) {
+        List<MDBPrivilege> dbGrants = this.listDatabaseGrants(dbname, 
queryWrapper);
+        if (dbGrants != null && dbGrants.size() > 0) {
+          pm.deletePersistentAll(dbGrants);
+        }
+        pm.deletePersistent(db);
+      }
+      success = commitTransaction();
+    } finally {
+      rollbackAndCleanup(success, queryWrapper);
+    }
+    return success;
+  }
+
+  @Override
+  public List<String> getDatabases(String pattern) throws MetaException {
+    if (pattern == null || pattern.equals("*")) {
+      return getAllDatabases();
+    }
+    boolean commited = false;
+    List<String> databases = null;
+    Query query = null;
+    try {
+      openTransaction();
+      // Take the pattern and split it on the | to get all the composing
+      // patterns
+      String[] subpatterns = pattern.trim().split("\\|");
+      StringBuilder filterBuilder = new StringBuilder();
+      List<String> parameterVals = new ArrayList<>(subpatterns.length);
+      appendPatternCondition(filterBuilder, "name", subpatterns, 
parameterVals);
+      query = pm.newQuery(MDatabase.class, filterBuilder.toString());
+      query.setResult("name");
+      query.setOrdering("name ascending");
+      Collection names = (Collection) 
query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
+      databases = new ArrayList<>();
+      for (Iterator i = names.iterator(); i.hasNext();) {
+        databases.add((String) i.next());
+      }
+      commited = commitTransaction();
+    } finally {
+      rollbackAndCleanup(commited, query);
+    }
+    return databases;
+  }
+
+  @Override
+  public List<String> getAllDatabases() throws MetaException {
+    boolean commited = false;
+    List<String> databases = null;
+
+    String queryStr = "select name from 
org.apache.hadoop.hive.metastore.model.MDatabase";
+    Query query = null;
+
+    openTransaction();
+    try {
+      query = pm.newQuery(queryStr);
+      query.setResult("name");
+      databases = new ArrayList<>((Collection<String>) query.execute());
+      commited = commitTransaction();
+    } finally {
+      rollbackAndCleanup(commited, query);
+    }
+    Collections.sort(databases);
+    return databases;
+  }
+
+  private MType getMType(Type type) {
+    List<MFieldSchema> fields = new ArrayList<>();
+    if (type.getFields() != null) {
+      for (FieldSchema field : type.getFields()) {
+        fields.add(new MFieldSchema(field.getName(), field.getType(), field
+            .getComment()));
+      }
+    }
+    return new MType(type.getName(), type.getType1(), type.getType2(), fields);
+  }
+
+  private Type getType(MType mtype) {
+    List<FieldSchema> fields = new ArrayList<>();
+    if (mtype.getFields() != null) {
+      for (MFieldSchema field : mtype.getFields()) {
+        fields.add(new FieldSchema(field.getName(), field.getType(), field
+            .getComment()));
+      }
+    }
+    Type ret = new Type();
+    ret.setName(mtype.getName());
+    ret.setType1(mtype.getType1());
+    ret.setType2(mtype.getType2());
+    ret.setFields(fields);
+    return ret;
+  }
+
+  @Override
+  public boolean createType(Type type) {
+    boolean success = false;
+    MType mtype = getMType(type);
+    boolean commited = false;
+    try {
+      openTransaction();
+      pm.makePersistent(mtype);
+      commited = commitTransaction();
+      success = true;
+    } finally {
+      if (!commited) {
+        rollbackTransaction();
+      }
+    }
+    return success;
+  }
+
+  @Override
+  public Type getType(String typeName) {
+    Type type = null;
+    boolean commited = false;
+    Query query = null;
+    try {
+      openTransaction();
+      query = pm.newQuery(MType.class, "name == typeName");
+      query.declareParameters("java.lang.String typeName");
+      query.setUnique(true);
+      MType mtype = (MType) query.execute(typeName.trim());
+      pm.retrieve(type);
+      if (mtype != null) {
+        type = getType(mtype);
+      }
+      commited = commitTransaction();
+    } finally {
+      rollbackAndCleanup(commited, query);
+    }
+    return type;
+  }
+
+  @Override
+  public boolean dropType(String typeName) {
+    boolean success = false;
+    Query query = null;
+    try {
+      openTransaction();
+      query = pm.newQuery(MType.class, "name == typeName");
+      query.declareParameters("java.lang.String typeName");
+      query.setUnique(true);
+      MType type = (MType) query.execute(typeName.trim());
+      pm.retrieve(type);
+      if (type != null) {
+        pm.deletePersistent(type);
+      }
+      success = commitTransaction();
+    } catch (JDOObjectNotFoundException e) {
+      success = commitTransaction();
+      LOG.debug("type not found " + typeName, e);
+    } finally {
+      rollbackAndCleanup(success, query);
+    }
+    return success;
+  }
+
+  @Override
+  public List<String> createTableWithConstraints(Table tbl,
+    List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
+    List<SQLUniqueConstraint> uniqueConstraints,
+    List<SQLNotNullConstraint> notNullConstraints)
+    throws InvalidObjectException, MetaException {
+    boolean success = false;
+    try {
+      openTransaction();
+      createTable(tbl);
+      // Add constraints.
+      // We need not do a deep retrieval of the Table Column Descriptor while 
persisting the
+      // constraints since this transaction involving create table is not yet 
committed.
+      List<String> constraintNames = addPrimaryKeys(primaryKeys, false);
+      constraintNames.addAll(addForeignKeys(foreignKeys, false));
+      constraintNames.addAll(addUniqueConstraints(uniqueConstraints, false));
+      constraintNames.addAll(addNotNullConstraints(notNullConstraints, false));
+      success = commitTransaction();
+      return constraintNames;
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  @Override
+  public void createTable(Table tbl) throws InvalidObjectException, 
MetaException {
+    boolean commited = false;
+    try {
+      openTransaction();
+      MTable mtbl = convertToMTable(tbl);
+      pm.makePersistent(mtbl);
+
+      PrincipalPrivilegeSet principalPrivs = tbl.getPrivileges();
+      List<Object> toPersistPrivObjs = new ArrayList<>();
+      if (principalPrivs != null) {
+        int now = (int)(System.currentTimeMillis()/1000);
+
+        Map<String, List<PrivilegeGrantInfo>> userPrivs = 
principalPrivs.getUserPrivileges();
+        putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, userPrivs, 
PrincipalType.USER);
+
+        Map<String, List<PrivilegeGrantInfo>> groupPrivs = 
principalPrivs.getGroupPrivileges();
+        putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, groupPrivs, 
PrincipalType.GROUP);
+
+        Map<String, List<PrivilegeGrantInfo>> rolePrivs = 
principalPrivs.getRolePrivileges();
+        putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, rolePrivs, 
PrincipalType.ROLE);
+      }
+      pm.makePersistentAll(toPersistPrivObjs);
+      commited = commitTransaction();
+    } finally {
+      if (!commited) {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  /**
+   * Convert PrivilegeGrantInfo from privMap to MTablePrivilege, and add all of
+   * them to the toPersistPrivObjs. These privilege objects will be persisted 
as
+   * part of createTable.
+   *
+   * @param mtbl
+   * @param toPersistPrivObjs
+   * @param now
+   * @param privMap
+   * @param type
+   */
+  private void putPersistentPrivObjects(MTable mtbl, List<Object> 
toPersistPrivObjs,
+      int now, Map<String, List<PrivilegeGrantInfo>> privMap, PrincipalType 
type) {
+    if (privMap != null) {
+      for (Map.Entry<String, List<PrivilegeGrantInfo>> entry : privMap
+          .entrySet()) {
+        String principalName = entry.getKey();
+        List<PrivilegeGrantInfo> privs = entry.getValue();
+        for (int i = 0; i < privs.size(); i++) {
+          PrivilegeGrantInfo priv = privs.get(i);
+          if (priv == null) {
+            continue;
+          }
+          MTablePrivilege mTblSec = new MTablePrivilege(
+              principalName, type.toString(), mtbl, priv.getPrivilege(),
+              now, priv.getGrantor(), priv.getGrantorType().toString(), priv
+                  .isGrantOption());
+          toPersistPrivObjs.add(mTblSec);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean dropTable(String dbName, String tableName) throws 
MetaException,
+    NoSuchObjectException, InvalidObjectException, InvalidInputException {
+    boolean success = false;
+    try {
+      openTransaction();
+      MTable tbl = getMTable(dbName, tableName);
+      pm.retrieve(tbl);
+      if (tbl != null) {
+        // first remove all the grants
+        List<MTablePrivilege> tabGrants = listAllTableGrants(dbName, 
tableName);
+        if (tabGrants != null && tabGrants.size() > 0) {
+          pm.deletePersistentAll(tabGrants);
+        }
+        List<MTableColumnPrivilege> tblColGrants = 
listTableAllColumnGrants(dbName,
+            tableName);
+        if (tblColGrants != null && tblColGrants.size() > 0) {
+          pm.deletePersistentAll(tblColGrants);
+        }
+
+        List<MPartitionPrivilege> partGrants = 
this.listTableAllPartitionGrants(dbName, tableName);
+        if (partGrants != null && partGrants.size() > 0) {
+          pm.deletePersistentAll(partGrants);
+        }
+
+        List<MPartitionColumnPrivilege> partColGrants = 
listTableAllPartitionColumnGrants(dbName,
+            tableName);
+        if (partColGrants != null && partColGrants.size() > 0) {
+          pm.deletePersistentAll(partColGrants);
+        }
+        // delete column statistics if present
+        try {
+          deleteTableColumnStatistics(dbName, tableName, null);
+        } catch (NoSuchObjectException e) {
+          LOG.info("Found no table level column statistics associated with db 
" + dbName +
+          " table " + tableName + " record to delete");
+        }
+
+        List<MConstraint> tabConstraints = 
listAllTableConstraintsWithOptionalConstraintName(
+                                           dbName, tableName, null);
+        if (tabConstraints != null && tabConstraints.size() > 0) {
+          pm.deletePersistentAll(tabConstraints);
+        }
+
+        preDropStorageDescriptor(tbl.getSd());
+        // then remove the table
+        pm.deletePersistentAll(tbl);
+      }
+      success = commitTransaction();
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+    return success;
+  }
+
+  private List<MConstraint> listAllTableConstraintsWithOptionalConstraintName
+    (String dbName, String tableName, String constraintname) {
+    dbName = normalizeIdentifier(dbName);
+    tableName = normalizeIdentifier(tableName);
+    constraintname = 
constraintname!=null?normalizeIdentifier(constraintname):null;
+    List<MConstraint> mConstraints = null;
+    List<String> constraintNames = new ArrayList<>();
+    Query query = null;
+
+    try {
+      query = pm.newQuery("select constraintName from 
org.apache.hadoop.hive.metastore.model.MConstraint  where "
+        + "((parentTable.tableName == ptblname && parentTable.database.name == 
pdbname) || "
+        + "(childTable != null && childTable.tableName == ctblname && "
+        + "childTable.database.name == cdbname)) " + (constraintname != null ?
+        " && constraintName == constraintname" : ""));
+      query.declareParameters("java.lang.String ptblname, java.lang.String 
pdbname,"
+      + "java.lang.String ctblname, java.lang.String cdbname" +
+        (constraintname != null ? ", java.lang.String constraintname" : ""));
+      Collection<?> constraintNamesColl =
+        constraintname != null ?
+          ((Collection<?>) query.
+            executeWithArray(tableName, dbName, tableName, dbName, 
constraintname)):
+          ((Collection<?>) query.
+            executeWithArray(tableName, dbName, tableName, dbName));
+      for (Iterator<?> i = constraintNamesColl.iterator(); i.hasNext();) {
+        String currName = (String) i.next();
+        constraintNames.add(currName);
+      }
+      query = pm.newQuery(MConstraint.class);
+      query.setFilter("param.contains(constraintName)");
+      query.declareParameters("java.util.Collection param");
+      Collection<?> constraints = 
(Collection<?>)query.execute(constraintNames);
+      mConstraints = new ArrayList<>();
+      for (Iterator<?> i = constraints.iterator(); i.hasNext();) {
+        MConstraint currConstraint = (MConstraint) i.next();
+        mConstraints.add(currConstraint);
+      }
+    } finally {
+      if (query != null) {
+        query.closeAll();
+      }
+    }
+    return mConstraints;
+  }
+
+  @Override
+  public Table getTable(String dbName, String tableName) throws MetaException {
+    boolean commited = false;
+    Table tbl = null;
+    try {
+      openTransaction();
+      tbl = convertToTable(getMTable(dbName, tableName));
+      commited = commitTransaction();
+    } finally {
+      if (!commited) {
+        rollbackTransaction();
+      }
+    }
+    return tbl;
+  }
+
+  @Override
+  public List<String> getTables(String dbName, String pattern) throws 
MetaException {
+    return getTables(dbName, pattern, null);
+  }
+
+  @Override
+  public List<String> getTables(String dbName, String pattern, TableType 
tableType) throws MetaException {
+    boolean commited = false;
+    Query query = null;
+    List<String> tbls = null;
+    try {
+      openTransaction();
+      dbName = normalizeIdentifier(dbName);
+      // Take the pattern and split it on the | to get all the composing
+      // patterns
+      List<String> parameterVals = new ArrayList<>();
+      StringBuilder filterBuilder = new StringBuilder();
+      //adds database.name == dbName to the filter
+      appendSimpleCondition(filterBuilder, "database.name", new String[] 
{dbName}, parameterVals);
+      if(pattern != null) {
+        appendPatternCondition(filterBuilder, "tableName", pattern, 
parameterVals);
+      }
+      if(tableType != null) {
+        appendPatternCondition(filterBuilder, "tableType", new String[] 
{tableType.toString()}, parameterVals);
+      }
+
+      query = pm.newQuery(MTable.class, filterBuilder.toString());
+      query.setResult("tableName");
+      query.setOrdering("tableName ascending");
+      Collection names = (Collection) 
query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
+      tbls = new ArrayList<>();
+      for (Iterator i = names.iterator(); i.hasNext();) {
+        tbls.add((String) i.next());
+      }
+      commited = commitTransaction();
+    } finally {
+      rollbackAndCleanup(commited, query);
+    }
+    return tbls;
+  }
+
+  @Override
+  public int getDatabaseCount() throws MetaException {
+    return getObjectCount("name", MDatabase.class.getName());
+  }
+
+  @Override
+  public int getPartitionCount() throws MetaException {
+    return getObjectCount("partitionName", MPartition.class.getName());
+  }
+
+  @Override
+  public int getTableCount() throws MetaException {
+    return getObjectCount("tableName", MTable.class.getName());
+  }
+
+  private int getObjectCount(String fieldName, String objName) {
+    Long result = 0L;
+    boolean commited = false;
+    Query query = null;
+    try {
+      openTransaction();
+      String queryStr =
+        "select count(" + fieldName + ") from " + objName;
+      query = pm.newQuery(queryStr);
+      result = (Long) query.execute();
+      commited = commitTransaction();
+    } finally {
+      rollbackAndCleanup(commited, query);
+    }
+    return result.intValue();
+  }
+
+  @Override
+  public List<TableMeta> getTableMeta(String dbNames, String tableNames, 
List<String> tableTypes)
+      throws MetaException {
+
+    boolean commited = false;
+    Query query = null;
+    List<TableMeta> metas = new ArrayList<>();
+    try {
+      openTransaction();
+      // Take the pattern and split it on the | to get all the composing
+      // patterns
+      StringBuilder filterBuilder = new StringBuilder();
+      List<String> parameterVals = new ArrayList<>();
+      if (dbNames != null && !dbNames.equals("*")) {
+        appendPatternCondition(filterBuilder, "database.name", dbNames, 
parameterVals);
+      }
+      if (tableNames != null && !tableNames.equals("*")) {
+        appendPatternCondition(filterBuilder, "tableName", tableNames, 
parameterVals);
+      }
+      if (tableTypes != null && !tableTypes.isEmpty()) {
+        appendSimpleCondition(filterBuilder, "tableType", 
tableTypes.toArray(new String[0]), parameterVals);
+      }
+
+      query = pm.newQuery(MTable.class, filterBuilder.toString());
+      Collection<MTable> tables = (Collection<MTable>) 
query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
+      for (MTable table : tables) {
+        TableMeta metaData = new TableMeta(
+            table.getDatabase().getName(), table.getTableName(), 
table.getTableType());
+        metaData.setComments(table.getParameters().get("comment"));
+        metas.add(metaData);
+      }
+      commited = commitTransaction();
+    } finally {
+      rollbackAndCleanup(commited, query);
+    }
+    return metas;
+  }
+
+  private StringBuilder appendPatternCondition(StringBuilder filterBuilder, 
String fieldName,
+      String[] elements, List<String> parameterVals) {
+    return appendCondition(filterBuilder, fieldName, elements, true, 
parameterVals);
+  }
+
+  private StringBuilder appendPatternCondition(StringBuilder builder,
+      String fieldName, String elements, List<String> parameters) {
+      elements = normalizeIdentifier(elements);
+    return appendCondition(builder, fieldName, elements.split("\\|"), true, 
parameters);
+  }
+
+  private StringBuilder appendSimpleCondition(StringBuilder builder,
+      String fieldName, String[] elements, List<String> parameters) {
+    return appendCondition(builder, fieldName, elements, false, parameters);
+  }
+
+  private StringBuilder appendCondition(StringBuilder builder,
+      String fieldName, String[] elements, boolean pattern, List<String> 
parameters) {
+    if (builder.length() > 0) {
+      builder.append(" && ");
+    }
+    builder.append(" (");
+    int length = builder.length();
+    for (String element : elements) {
+      if (pattern) {
+        element = "(?i)" + element.replaceAll("\\*", ".*");
+      }
+      parameters.add(element);
+      if (builder.length() > length) {
+        builder.append(" || ");
+      }
+      builder.append(fieldName);
+      if (pattern) {
+        
builder.append(".matches(").append(JDO_PARAM).append(parameters.size()).append(")");
+      } else {
+        builder.append(" == ").append(JDO_PARAM).append(parameters.size());
+      }
+    }
+    builder.append(" )");
+    return builder;
+  }
+
+  @Override
+  public List<String> getAllTables(String dbName) throws MetaException {
+    return getTables(dbName, ".*");
+  }
+
+  class AttachedMTableInfo {
+    MTable mtbl;
+    MColumnDescriptor mcd;
+
+    public AttachedMTableInfo() {}
+
+    public AttachedMTableInfo(MTable mtbl, MColumnDescriptor mcd) {
+      this.mtbl = mtbl;
+      this.mcd = mcd;
+    }
+  }
+
+  private AttachedMTableInfo getMTable(String db, String table, boolean 
retrieveCD) {
+    AttachedMTableInfo nmtbl = new AttachedMTableInfo();
+    MTable mtbl = null;
+    boolean commited = false;
+    Query query = null;
+    try {
+      openTransaction();
+      db = normalizeIdentifier(db);
+      table = normalizeIdentifier(table);
+      query = pm.newQuery(MTable.class, "tableName == table && database.name 
== db");
+      query.declareParameters("java.lang.String table, java.lang.String db");
+      query.setUnique(true);
+      mtbl = (MTable) query.execute(table, db);
+      pm.retrieve(mtbl);
+      // Retrieving CD can be expensive and unnecessary, so do it only when 
required.
+      if (mtbl != null && retrieveCD) {
+        pm.retrieve(mtbl.getSd());
+        pm.retrieveAll(mtbl.getSd().getCD());
+        nmtbl.mcd = mtbl.getSd().getCD();
+      }
+      commited = commitTransaction();
+    } finally {
+      rollbackAndCleanup(commited, query);
+    }
+    nmtbl.mtbl = mtbl;
+    return nmtbl;
+  }
+
+  private MTable getMTable(String db, String table) {
+    AttachedMTableInfo nmtbl = getMTable(db, table, false);
+    return nmtbl.mtbl;
+  }
+
+  @Override
+  public List<Table> getTableObjectsByName(String db, List<String> tbl_names) 
throws MetaException,
+      UnknownDBException {
+    List<Table> tables = new ArrayList<>();
+    boolean committed = false;
+    Query dbExistsQuery = null;
+    Query query = null;
+    try {
+      openTransaction();
+      db = normalizeIdentifier(db);
+      dbExistsQuery = pm.newQuery(MDatabase.class, "name == db");
+      dbExistsQuery.declareParameters("java.lang.String db");
+      dbExistsQuery.setUnique(true);
+      dbExistsQuery.setResult("name");
+      String dbNameIfExists = (String) dbExistsQuery.execute(db);
+      if (dbNameIfExists == null || dbNameIfExists.isEmpty()) {
+        throw new UnknownDBException("Could not find database " + db);
+      }
+
+      List<String> lowered_tbl_names = new ArrayList<>();
+      for (String t : tbl_names) {
+        lowered_tbl_names.add(normalizeIdentifier(t));
+      }
+      query = pm.newQuery(MTable.class);
+      query.setFilter("database.name == db && tbl_names.contains(tableName)");
+      query.declareParameters("java.lang.String db, java.util.Collection 
tbl_names");
+      Collection mtables = (Collection) query.execute(db, lowered_tbl_names);
+      for (Iterator iter = mtables.iterator(); iter.hasNext();) {
+        tables.add(convertToTable((MTable) iter.next()));
+      }
+      committed = commitTransaction();
+    } finally {
+      rollbackAndCleanup(committed, query);
+      if (dbExistsQuery != null) {
+        dbExistsQuery.closeAll();
+      }
+    }
+    return tables;
+  }
+
+  /** Makes shallow copy of a list to avoid DataNucleus mucking with our 
objects. */
+  private <T> List<T> convertList(List<T> dnList) {
+    return (dnList == null) ? null : Lists.newArrayList(dnList);
+  }
+
+  /** Makes shallow copy of a map to avoid DataNucleus mucking with our 
objects. */
+  private Map<String, String> convertMap(Map<String, String> dnMap) {
+    return MetaStoreUtils.trimMapNulls(dnMap,
+        MetastoreConf.getBoolVar(getConf(), 
ConfVars.ORM_RETRIEVE_MAPNULLS_AS_EMPTY_STRINGS));
+  }
+
+  private Table convertToTable(MTable mtbl) throws MetaException {
+    if (mtbl == null) {
+      return null;
+    }
+    String tableType = mtbl.getTableType();
+    if (tableType == null) {
+      // for backwards compatibility with old metastore persistence
+      if (mtbl.getViewOriginalText() != null) {
+        tableType = TableType.VIRTUAL_VIEW.toString();
+      } else if (Boolean.parseBoolean(mtbl.getParameters().get("EXTERNAL"))) {
+        tableType = TableType.EXTERNAL_TABLE.toString();
+      } else {
+        tableType = TableType.MANAGED_TABLE.toString();
+      }
+    }
+    final Table t = new Table(mtbl.getTableName(), 
mtbl.getDatabase().getName(), mtbl
+        .getOwner(), mtbl.getCreateTime(), mtbl.getLastAccessTime(), mtbl
+        .getRetention(), convertToStorageDescriptor(mtbl.getSd()),
+        convertToFieldSchemas(mtbl.getPartitionKeys()), 
convertMap(mtbl.getParameters()),
+        mtbl.getViewOriginalText(), mtbl.getViewExpandedText(), tableType);
+    t.setRewriteEnabled(mtbl.isRewriteEnabled());
+    return t;
+  }
+
+  private MTable convertToMTable(Table tbl) throws InvalidObjectException,
+      MetaException {
+    if (tbl == null) {
+      return null;
+    }
+    MDatabase mdb = null;
+    try {
+      mdb = getMDatabase(tbl.getDbName());
+    } catch (NoSuchObjectException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw new InvalidObjectException("Database " + tbl.getDbName()
+          + " doesn't exist.");
+    }
+
+    // If the table has property EXTERNAL set, update table type
+    // accordingly
+    String tableType = tbl.getTableType();
+    boolean isExternal = 
Boolean.parseBoolean(tbl.getParameters().get("EXTERNAL"));
+    if (TableType.MANAGED_TABLE.toString().equals(tableType)) {
+      if (isExternal) {
+        tableType = TableType.EXTERNAL_TABLE.toString();
+      }
+    }
+    if (TableType.EXTERNAL_TABLE.toString().equals(tableType)) {
+      if (!isExternal) {
+        tableType = TableType.MANAGED_TABLE.toString();
+      }
+    }
+
+    // A new table is always created with a new column descriptor
+    return new MTable(normalizeIdentifier(tbl.getTableName()), mdb,
+        convertToMStorageDescriptor(tbl.getSd()), tbl.getOwner(), tbl
+        .getCreateTime(), tbl.getLastAccessTime(), tbl.getRetention(),
+        convertToMFieldSchemas(tbl.getPartitionKeys()), tbl.getParameters(),
+        tbl.getViewOriginalText(), tbl.getViewExpandedText(), 
tbl.isRewriteEnabled(),
+        tableType);
+  }
+
+  private List<MFieldSchema> convertToMFieldSchemas(List<FieldSchema> keys) {
+    List<MFieldSchema> mkeys = null;
+    if (keys != null) {
+      mkeys = new ArrayList<>(keys.size());
+      for (FieldSchema part : keys) {
+        mkeys.add(new MFieldSchema(part.getName().toLowerCase(),
+            part.getType(), part.getComment()));
+      }
+    }
+    return mkeys;
+  }
+
+  private List<FieldSchema> convertToFieldSchemas(List<MFieldSchema> mkeys) {
+    List<FieldSchema> keys = null;
+    if (mkeys != null) {
+      keys = new ArrayList<>(mkeys.size());
+      for (MFieldSchema part : mkeys) {
+        keys.add(new FieldSchema(part.getName(), part.getType(), part
+            .getComment()));
+      }
+    }
+    return keys;
+  }
+
+  private List<MOrder> convertToMOrders(List<Order> keys) {
+    List<MOrder> mkeys = null;
+    if (keys != null) {
+      mkeys = new ArrayList<>(keys.size());
+      for (Order part : keys) {
+        mkeys.add(new MOrder(normalizeIdentifier(part.getCol()), 
part.getOrder()));
+      }
+    }
+    return mkeys;
+  }
+
+  private List<Order> convertToOrders(List<MOrder> mkeys) {
+    List<Order> keys = null;
+    if (mkeys != null) {
+      keys = new ArrayList<>(mkeys.size());
+      for (MOrder part : mkeys) {
+        keys.add(new Order(part.getCol(), part.getOrder()));
+      }
+    }
+    return keys;
+  }
+
+  private SerDeInfo convertToSerDeInfo(MSerDeInfo ms) throws MetaException {
+    if (ms == null) {
+      throw new MetaException("Invalid SerDeInfo object");
+    }
+    return new SerDeInfo(ms.getName(), ms.getSerializationLib(), 
convertMap(ms.getParameters()));
+  }
+
+  private MSerDeInfo convertToMSerDeInfo(SerDeInfo ms) throws MetaException {
+    if (ms == null) {
+      throw new MetaException("Invalid SerDeInfo object");
+    }
+    return new MSerDeInfo(ms.getName(), ms.getSerializationLib(), ms
+        .getParameters());
+  }
+
+  /**
+   * Given a list of model field schemas, create a new model column descriptor.
+   * @param cols the columns the column descriptor contains
+   * @return a new column descriptor db-backed object
+   */
+  private MColumnDescriptor createNewMColumnDescriptor(List<MFieldSchema> 
cols) {
+    if (cols == null) {
+      return null;
+    }
+    return new MColumnDescriptor(cols);
+  }
+
+  // MSD and SD should be same objects. Not sure how to make then same right 
now
+  // MSerdeInfo *& SerdeInfo should be same as well
+  private StorageDescriptor convertToStorageDescriptor(
+      MStorageDescriptor msd,
+      boolean noFS) throws MetaException {
+    if (msd == null) {
+      return null;
+    }
+    List<MFieldSchema> mFieldSchemas = msd.getCD() == null ? null : 
msd.getCD().getCols();
+
+    StorageDescriptor sd = new StorageDescriptor(noFS ? null : 
convertToFieldSchemas(mFieldSchemas),
+        msd.getLocation(), msd.getInputFormat(), msd.getOutputFormat(), msd
+        .isCompressed(), msd.getNumBuckets(), convertToSerDeInfo(msd
+        .getSerDeInfo()), convertList(msd.getBucketCols()), convertToOrders(msd
+        .getSortCols()), convertMap(msd.getParameters()));
+    SkewedInfo skewedInfo = new 
SkewedInfo(convertList(msd.getSkewedColNames()),
+        convertToSkewedValues(msd.getSkewedColValues()),
+        covertToSkewedMap(msd.getSkewedColValueLocationMaps()));
+    sd.setSkewedInfo(skewedInfo);
+    sd.setStoredAsSubDirectories(msd.isStoredAsSubDirectories());
+    return sd;
+  }
+
+  private StorageDescriptor convertToStorageDescriptor(MStorageDescriptor msd)
+      throws MetaException {
+    return convertToStorageDescriptor(msd, false);
+  }
+
+  /**
+   * Convert a list of MStringList to a list of list string
+   *
+   * @param mLists
+   * @return
+   */
+  private List<List<String>> convertToSkewedValues(List<MStringList> mLists) {
+    List<List<String>> lists = null;
+    if (mLists != null) {
+      lists = new ArrayList<>(mLists.size());
+      for (MStringList element : mLists) {
+        lists.add(new ArrayList<>(element.getInternalList()));
+      }
+    }
+    return lists;
+  }
+
+  private List<MStringList> convertToMStringLists(List<List<String>> mLists) {
+    List<MStringList> lists = null ;
+    if (null != mLists) {
+      lists = new ArrayList<>();
+      for (List<String> mList : mLists) {
+        lists.add(new MStringList(mList));
+      }
+    }
+    return lists;
+  }
+
+  /**
+   * Convert a MStringList Map to a Map
+   * @param mMap
+   * @return
+   */
+  private Map<List<String>, String> covertToSkewedMap(Map<MStringList, String> 
mMap) {
+    Map<List<String>, String> map = null;
+    if (mMap != null) {
+      map = new HashMap<>(mMap.size());
+      Set<MStringList> keys = mMap.keySet();
+      for (MStringList key : keys) {
+        map.put(new ArrayList<>(key.getInternalList()), mMap.get(key));
+      }
+    }
+    return map;
+  }
+
+  /**
+   * Covert a Map to a MStringList Map
+   * @param mMap
+   * @return
+   */
+  private Map<MStringList, String> covertToMapMStringList(Map<List<String>, 
String> mMap) {
+    Map<MStringList, String> map = null;
+    if (mMap != null) {
+      map = new HashMap<>(mMap.size());
+      Set<List<String>> keys = mMap.keySet();
+      for (List<String> key : keys) {
+        map.put(new MStringList(key), mMap.get(key));
+      }
+    }
+    return map;
+  }
+
+  /**
+   * Converts a storage descriptor to a db-backed storage descriptor.  Creates 
a
+   *   new db-backed column descriptor object for this SD.
+   * @param sd the storage descriptor to wrap in a db-backed object
+   * @return the storage descriptor db-backed object
+   * @throws MetaException
+   */
+  private MStorageDescriptor convertToMStorageDescriptor(StorageDescriptor sd)
+      throws MetaException {
+    if (sd == null) {
+      return null;
+    }
+    MColumnDescriptor mcd = 
createNewMColumnDescriptor(convertToMFieldSchemas(sd.getCols()));
+    return convertToMStorageDescriptor(sd, mcd);
+  }
+
+  /**
+   * Converts a storage descriptor to a db-backed storage descriptor.  It 
points the
+   * storage descriptor's column descriptor to the one passed as an argument,
+   * so it does not create a new mcolumn descriptor object.
+   * @param sd the storage descriptor to wrap in a db-backed object
+   * @param mcd the db-backed column descriptor
+   * @return the db-backed storage descriptor object
+   * @throws MetaException
+   */
+  private MStorageDescriptor convertToMStorageDescriptor(StorageDescriptor sd,
+      MColumnDescriptor mcd) throws MetaException {
+    if (sd == null) {
+      return null;
+    }
+    return new MStorageDescriptor(mcd, sd
+        .getLocation(), sd.getInputFormat(), sd.getOutputFormat(), sd
+        .isCompressed(), sd.getNumBuckets(), convertToMSerDeInfo(sd
+        .getSerdeInfo()), sd.getBucketCols(),
+        convertToMOrders(sd.getSortCols()), sd.getParameters(),
+        (null == sd.getSkewedInfo()) ? null
+            : sd.getSkewedInfo().getSkewedColNames(),
+        convertToMStringLists((null == sd.getSkewedInfo()) ? null : 
sd.getSkewedInfo()
+            .getSkewedColValues()),
+        covertToMapMStringList((null == sd.getSkewedInfo()) ? null : 
sd.getSkewedInfo()
+            .getSkewedColValueLocationMaps()), sd.isStoredAsSubDirectories());
+  }
+
+  @Override
+  public boolean addPartitions(String dbName, String tblName, List<Partition> 
parts)
+      throws InvalidObjectException, MetaException {
+    boolean success = false;
+    openTransaction();
+    try {
+      List<MTablePrivilege> tabGrants = null;
+      List<MTableColumnPrivilege> tabColumnGrants = null;
+      MTable table = this.getMTable(dbName, tblName);
+      if 
("TRUE".equalsIgnoreCase(table.getParameters().get("PARTITION_LEVEL_PRIVILEGE")))
 {
+        tabGrants = this.listAllTableGrants(dbName, tblName);
+        tabColumnGrants = this.listTableAllColumnGrants(dbName, tblName);
+      }
+      List<Object> toPersist = new ArrayList<>();
+      for (Partition part : parts) {
+        if (!part.getTableName().equals(tblName) || 
!part.getDbName().equals(dbName)) {
+          throw new MetaException("Partition does not belong to target table "
+              + dbName + "." + tblName + ": " + part);
+        }
+        MPartition mpart = convertToMPart(part, true);
+        toPersist.add(mpart);
+        int now = (int)(System.currentTimeMillis()/1000);
+        if (tabGrants != null) {
+          for (MTablePrivilege tab: tabGrants) {
+            toPersist.add(new MPartitionPrivilege(tab.getPrincipalName(),
+                tab.getPrincipalType(), mpart, tab.getPrivilege(), now,
+                tab.getGrantor(), tab.getGrantorType(), tab.getGrantOption()));
+          }
+        }
+
+        if (tabColumnGrants != null) {
+          for (MTableColumnPrivilege col : tabColumnGrants) {
+            toPersist.add(new MPartitionColumnPrivilege(col.getPrincipalName(),
+                col.getPrincipalType(), mpart, col.getColumnName(), 
col.getPrivilege(),
+                now, col.getGrantor(), col.getGrantorType(), 
col.getGrantOption()));
+          }
+        }
+      }
+      if (toPersist.size() > 0) {
+        pm.makePersistentAll(toPersist);
+        pm.flush();
+      }
+
+      success = commitTransaction();
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+    return success;
+  }
+
+  private boolean isValidPartition(
+      Partition part, boolean ifNotExists) throws MetaException {
+    MetaStoreUtils.validatePartitionNameCharacters(part.getValues(),
+        partitionValidationPattern);
+    boolean doesExist = doesPartitionExist(
+        part.getDbName(), part.getTableName(), part.getValues());
+    if (doesExist && !ifNotExists) {
+      throw new MetaException("Partition already exists: " + part);
+    }
+    return !doesExist;
+  }
+
+  @Override
+  public boolean addPartitions(String dbName, String tblName,
+                               PartitionSpecProxy partitionSpec, boolean 
ifNotExists)
+      throws InvalidObjectException, MetaException {
+    boolean success = false;
+    openTransaction();
+    try {
+      List<MTablePrivilege> tabGrants = null;
+      List<MTableColumnPrivilege> tabColumnGrants = null;
+      MTable table = this.getMTable(dbName, tblName);
+      if 
("TRUE".equalsIgnoreCase(table.getParameters().get("PARTITION_LEVEL_PRIVILEGE")))
 {
+        tabGrants = this.listAllTableGrants(dbName, tblName);
+        tabColumnGrants = this.listTableAllColumnGrants(dbName, tblName);
+      }
+
+      if (!partitionSpec.getTableName().equals(tblName) || 
!partitionSpec.getDbName().equals(dbName)) {
+        throw new MetaException("Partition does not belong to target table "
+            + dbName + "." + tblName + ": " + partitionSpec);
+      }
+
+      PartitionSpecProxy.PartitionIterator iterator = 
partitionSpec.getPartitionIterator();
+
+      int now = (int)(System.currentTimeMillis()/1000);
+
+      while (iterator.hasNext()) {
+        Partition part = iterator.next();
+
+        if (isValidPartition(part, ifNotExists)) {
+          MPartition mpart = convertToMPart(part, true);
+          pm.makePersistent(mpart);
+          if (tabGrants != null) {
+            for (MTablePrivilege tab : tabGrants) {
+              pm.makePersistent(new MPartitionPrivilege(tab.getPrincipalName(),
+                  tab.getPrincipalType(), mpart, tab.getPrivilege(), now,
+                  tab.getGrantor(), tab.getGrantorType(), 
tab.getGrantOption()));
+            }
+          }
+
+          if (tabColumnGrants != null) {
+            for (MTableColumnPrivilege col : tabColumnGrants) {
+              pm.makePersistent(new 
MPartitionColumnPrivilege(col.getPrincipalName(),
+                  col.getPrincipalType(), mpart, col.getColumnName(), 
col.getPrivilege(),
+                  now, col.getGrantor(), col.getGrantorType(), 
col.getGrantOption()));
+            }
+          }
+        }
+      }
+
+      success = commitTransaction();
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+    return success;
+  }
+
+  @Override
+  public boolean addPartition(Partition part) throws InvalidObjectException,
+      MetaException {
+    boolean success = false;
+    boolean commited = false;
+    try {
+      MTable table = this.getMTable(part.getDbName(), part.getTableName());
+      List<MTablePrivilege> tabGrants = null;
+      List<MTableColumnPrivilege> tabColumnGrants = null;
+      if 
("TRUE".equalsIgnoreCase(table.getParameters().get("PARTITION_LEVEL_PRIVILEGE")))
 {
+        tabGrants = this.listAllTableGrants(part
+            .getDbName(), part.getTableName());
+        tabColumnGrants = this.listTableAllColumnGrants(
+            part.getDbName(), part.getTableName());
+      }
+      openTransaction();
+      MPartition mpart = convertToMPart(part, true);
+      pm.makePersistent(mpart);
+
+      int now = (int)(System.currentTimeMillis()/1000);
+      List<Object> toPersist = new ArrayList<>();
+      if (tabGrants != null) {
+        for (MTablePrivilege tab: tabGrants) {
+          MPartitionPrivilege partGrant = new MPartitionPrivilege(tab
+              .getPrincipalName(), tab.getPrincipalType(),
+              mpart, tab.getPrivilege(), now, tab.getGrantor(), tab
+                  .getGrantorType(), tab.getGrantOption());
+          toPersist.add(partGrant);
+        }
+      }
+
+      if (tabColumnGrants != null) {
+        for (MTableColumnPrivilege col : tabColumnGrants) {
+          MPartitionColumnPrivilege partColumn = new 
MPartitionColumnPrivilege(col
+              .getPrincipalName(), col.getPrincipalType(), mpart, col
+              .getColumnName(), col.getPrivilege(), now, col.getGrantor(), col
+              .getGrantorType(), col.getGrantOption());
+          toPersist.add(partColumn);
+        }
+
+        if (toPersist.size() > 0) {
+          pm.makePersistentAll(toPersist);
+        }
+      }
+
+      commited = commitTransaction();
+      success = true;
+    } finally {
+      if (!commited) {
+        rollbackTransaction();
+      }
+    }
+    return success;
+  }
+
+  @Override
+  public Partition getPartition(String dbName, String tableName,
+      List<String> part_vals) throws NoSuchObjectException, MetaException {
+    openTransaction();
+    Partition part = convertToPart(getMPartition(dbName, tableName, 
part_vals));
+    commitTransaction();
+    if(part == null) {
+      throw new NoSuchObjectException("partition values="
+          + part_vals.toString());
+    }
+    part.setValues(part_vals);
+    return part;
+  }
+
+  private MPartition getMPartition(String dbName, String tableName, 
List<String> part_vals)
+      throws MetaException {
+    List<MPartition> mparts = null;
+    MPartition ret = null;
+    boolean commited = false;
+    Query query = null;
+    try {
+      openTransaction();
+      dbName = normalizeIdentifier(dbName);
+      tableName = normalizeIdentifier(tableName);
+      MTable mtbl = getMTable(dbName, tableName);
+      if (mtbl == null) {
+        commited = commitTransaction();
+        return null;
+      }
+      // Change the query to use part_vals instead of the name which is
+      // redundant TODO: callers of this often get part_vals out of name for 
no reason...
+      String name =
+          
Warehouse.makePartName(convertToFieldSchemas(mtbl.getPartitionKeys()), 
part_vals);
+      query =
+          pm.newQuery(MPartition.class,
+              "table.tableName == t1 && table.database.name == t2 && 
partitionName == t3");
+      query.declareParameters("java.lang.String t1, java.lang.String t2, 
java.lang.String t3");
+      mparts = (List<MPartition>) query.execute(tableName, dbName, name);
+      pm.retrieveAll(mparts);
+      commited = commitTransaction();
+      // We need to compare partition name with requested name since some DBs
+      // (like MySQL, Derby) considers 'a' = 'a ' whereas others like 
(Postgres,
+      // Oracle) doesn't exhibit this problem.
+      if (mparts != null && mparts.size() > 0) {
+        if (mparts.size() > 1) {
+          throw new MetaException(
+              "Expecting only one partition but more than one partitions are 
found.");
+        } else {
+          MPartition mpart = mparts.get(0);
+          if (name.equals(mpart.getPartitionName())) {
+            ret = mpart;
+          } else {
+            throw new MetaException("Expecting a partition with name " + name
+                + ", but metastore is returning a partition with name " + 
mpart.getPartitionName()
+                + ".");
+          }
+        }
+      }
+    } finally {
+      rollbackAndCleanup(commited, query);
+    }
+    return ret;
+  }
+
+  /**
+   * Convert a Partition object into an MPartition, which is an object backed 
by the db
+   * If the Partition's set of columns is the same as the parent table's AND 
useTableCD
+   * is true, then this partition's storage descriptor's column descriptor 
will point
+   * to the same one as the table's storage descriptor.
+   * @param part the partition to convert
+   * @param useTableCD whether to try to use the parent table's column 
descriptor.
+   * @return the model partition object
+   * @throws InvalidObjectException
+   * @throws MetaException
+   */
+  private MPartition convertToMPart(Partition part, boolean useTableCD)
+      throws InvalidObjectException, MetaException {
+    if (part == null) {
+      return null;
+    }
+    MTable mt = getMTable(part.getDbName(), part.getTableName());
+    if (mt == null) {
+      throw new InvalidObjectException(
+          "Partition doesn't have a valid table or database name");
+    }
+
+    // If this partition's set of columns is the same as the parent table's,
+    // use the parent table's, so we do not create a duplicate column 
descriptor,
+    // thereby saving space
+    MStorageDescriptor msd;
+    if (useTableCD &&
+        mt.getSd() != null && mt.getSd().getCD() != null &&
+        mt.getSd().getCD().getCols() != null &&
+        part.getSd() != null &&
+        convertToFieldSchemas(mt.getSd().getCD().getCols()).
+        equals(part.getSd().getCols())) {
+      msd = convertToMStorageDescriptor(part.getSd(), mt.getSd().getCD());
+    } else {
+      msd = convertToMStorageDescriptor(part.getSd());
+    }
+
+    return new MPartition(Warehouse.makePartName(convertToFieldSchemas(mt
+        .getPartitionKeys()), part.getValues()), mt, part.getValues(), part
+        .getCreateTime(), part.getLastAccessTime(),
+        msd, part.getParameters());
+  }
+
+  private Partition convertToPart(MPartition mpart) throws MetaException {
+    if (mpart == null) {
+      return null;
+    }
+    return new Partition(convertList(mpart.getValues()), 
mpart.getTable().getDatabase()
+        .getName(), mpart.getTable().getTableName(), mpart.getCreateTime(),
+        mpart.getLastAccessTime(), convertToStorageDescriptor(mpart.getSd()),
+        convertMap(mpart.getParameters()));
+  }
+
+  private Partition convertToPart(String dbName, String tblName, MPartition 
mpart)
+      throws MetaException {
+    if (mpart == null) {
+      return null;
+    }
+    return new Partition(convertList(mpart.getValues()), dbName, tblName,
+        mpart.getCreateTime(), mpart.getLastAccessTime(),
+        convertToStorageDescriptor(mpart.getSd(), false), 
convertMap(mpart.getParameters()));
+  }
+
+  @Override
+  public boolean dropPartition(String dbName, String tableName,
+    List<String> part_vals) throws MetaException, NoSuchObjectException, 
InvalidObjectException,
+    InvalidInputException {
+    boolean success = false;
+    try {
+      openTransaction();
+      MPartition part = getMPartition(dbName, tableName, part_vals);
+      dropPartitionCommon(part);
+      success = commitTransaction();
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+    return success;
+  }
+
+  @Override
+  public void dropPartitions(String dbName, String tblName, List<String> 
partNames)
+      throws MetaException, NoSuchObjectException {
+    if (partNames.isEmpty()) return;
+    boolean success = false;
+    openTransaction();
+    try {
+      // Delete all things.
+      dropPartitionGrantsNoTxn(dbName, tblName, partNames);
+      dropPartitionAllColumnGrantsNoTxn(dbName, tblName, partNames);
+      dropPartitionColumnStatisticsNoTxn(dbName, tblName, partNames);
+
+      // CDs are reused; go thry partition SDs, detach all CDs from SDs, then 
remove unused CDs.
+      for (MColumnDescriptor mcd : detachCdsFromSdsNoTxn(dbName, tblName, 
partNames)) {
+        removeUnusedColumnDescriptor(mcd);
+      }
+      dropPartitionsNoTxn(dbName, tblName, partNames);
+      if (!(success = commitTransaction())) {
+        throw new MetaException("Failed to drop partitions"); // Should not 
happen?
+      }
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  /**
+   * Drop an MPartition and cascade deletes (e.g., delete partition privilege 
grants,
+   *   drop the storage descriptor cleanly, etc.)
+   * @param part - the MPartition to drop
+   * @return whether the transaction committed successfully
+   * @throws InvalidInputException
+   * @throws InvalidObjectException
+   * @throws MetaException
+   * @throws NoSuchObjectException
+   */
+  private boolean dropPartitionCommon(MPartition part) throws 
NoSuchObjectException, MetaException,
+    InvalidObjectException, InvalidInputException {
+    boolean success = false;
+    try {
+      openTransaction();
+      if (part != null) {
+        List<MFieldSchema> schemas = part.getTable().getPartitionKeys();
+        List<String> colNames = new ArrayList<>();
+        for (MFieldSchema col: schemas) {
+          colNames.add(col.getName());
+        }
+        String partName = FileUtils.makePartName(colNames, part.getValues());
+
+        List<MPartitionPrivilege> partGrants = listPartitionGrants(
+            part.getTable().getDatabase().getName(),
+            part.getTable().getTableName(),
+            Lists.newArrayList(partName));
+
+        if (partGrants != null && partGrants.size() > 0) {
+          pm.deletePersistentAll(partGrants);
+        }
+
+        List<MPartitionColumnPrivilege> partColumnGrants = 
listPartitionAllColumnGrants(
+            part.getTable().getDatabase().getName(),
+            part.getTable().getTableName(),
+            Lists.newArrayList(partName));
+        if (partColumnGrants != null && partColumnGrants.size() > 0) {
+          pm.deletePersistentAll(partColumnGrants);
+        }
+
+        String dbName = part.getTable().getDatabase().getName();
+        String tableName = part.getTable().getTableName();
+
+        // delete partition level column stats if it exists
+       try {
+          deletePartitionColumnStatistics(dbName, tableName, partName, 
part.getValues(), null);
+        } catch (NoSuchObjectException e) {
+          LOG.info("No column statistics records found to delete");
+        }
+
+        preDropStorageDescriptor(part.getSd());
+        pm.deletePersistent(part);
+      }
+      success = commitTransaction();
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+    return success;
+  }
+
+  @Override
+  public List<Partition> getPartitions(
+      String dbName, String tableName, int maxParts) throws MetaException, 
NoSuchObjectException {
+    return getPartitionsInternal(dbName, tableName, maxParts, true, true);
+  }
+
+  protected List<Partition> getPartitionsInternal(
+      String dbName, String tblName, final int maxParts, boolean allowSql, 
boolean allowJdo)
+          throws MetaException, NoSuchObjectException {
+    return new GetListHelper<Partition>(dbName, tblName, allowSql, allowJdo) {
+      @Override
+      protected List<Partition> getSqlResult(GetHelper<List<Partition>> ctx) 
throws MetaException {
+        Integer max = (maxParts < 0) ? null : maxParts;
+        return directSql.getPartitions(dbName, tblName, max);
+      }
+      @Override
+      protected List<Partition> getJdoResult(
+          GetHelper<List<Partition>> ctx) throws MetaException {
+        QueryWrapper queryWrapper = new QueryWrapper();
+        try {
+          return convertToParts(listMPartitions(dbName, tblName, maxParts, 
queryWrapper));
+        } finally {
+          queryWrapper.close();
+        }
+      }
+    }.run(false);
+  }
+
+  @Override
+  public List<Partition> getPartitionsWithAuth(String dbName, String tblName,
+      short max, String userName, List<String> groupNames)
+          throws MetaException, InvalidObjectException {
+    boolean success = false;
+    QueryWrapper queryWrapper = new QueryWrapper();
+
+    try {
+      openTransaction();
+      List<MPartition> mparts = listMPartitions(dbName, tblName, max, 
queryWrapper);
+      List<Partition> parts = new ArrayList<>(mparts.size());
+      if (mparts != null && mparts.size()>0) {
+        for (MPartition mpart : mparts) {
+          MTable mtbl = mpart.getTable();
+          Partition part = convertToPart(mpart);
+          parts.add(part);
+
+          if 
("TRUE".equalsIgnoreCase(mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE")))
 {
+            String partName = 
Warehouse.makePartName(this.convertToFieldSchemas(mtbl
+                .getPartitionKeys()), part.getValues());
+            PrincipalPrivilegeSet partAuth = 
this.getPartitionPrivilegeSet(dbName,
+                tblName, partName, userName, groupNames);
+            part.setPrivileges(partAuth);
+          }
+        }
+      }
+      success =  commitTransaction();
+      return parts;
+    } finally {
+      rollbackAndCleanup(success, queryWrapper);
+    }
+  }
+
+  @Override
+  public Partition getPartitionWithAuth(String dbName, String tblName,
+      List<String> partVals, String user_name, List<String> group_names)
+      throws NoSuchObjectException, MetaException, InvalidObjectException {
+    boolean success = false;
+    try {
+      openTransaction();
+      MPartition mpart = getMPartition(dbName, tblName, partVals);
+      if (mpart == null) {
+        commitTransaction();
+        throw new NoSuchObjectException("partition values="
+            + partVals.toString());
+      }
+      Partition part = null;
+      MTable mtbl = mpart.getTable();
+      part = convertToPart(mpart);
+      if 
("TRUE".equalsIgnoreCase(mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE")))
 {
+        String partName = 
Warehouse.makePartName(this.convertToFieldSchemas(mtbl
+            .getPartitionKeys()), partVals);
+        PrincipalPrivilegeSet partAuth = this.getPartitionPrivilegeSet(dbName,
+            tblName, partName, user_name, group_names);
+        part.setPrivileges(partAuth);
+      }
+
+      success = commitTransaction();
+      return part;
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  private List<Partition> convertToParts(List<MPartition> mparts) throws 
MetaException {
+    return convertToParts(mparts, null);
+  }
+
+  private List<Partition> convertToParts(List<MPartition> src, List<Partition> 
dest)
+      throws MetaException {
+    if (src == null) {
+      return dest;
+    }
+    if (d

<TRUNCATED>

Reply via email to