This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new acb1bd1  HIVE-23032: Add batching in Lock generation (Denys Kuzmenko, 
reviewed by Peter Vary)
acb1bd1 is described below

commit acb1bd11abedad910a2b2ed04d91e461fda2de42
Author: Denys Kuzmenko <dkuzme...@apache.org>
AuthorDate: Mon Mar 30 09:28:57 2020 +0200

    HIVE-23032: Add batching in Lock generation (Denys Kuzmenko, reviewed by 
Peter Vary)
---
 .../datasource/BoneCPDataSourceProvider.java       |  25 ++-
 .../datasource/DbCPDataSourceProvider.java         |  26 ++-
 .../datasource/HikariCPDataSourceProvider.java     |  20 ++-
 .../hadoop/hive/metastore/txn/TxnDbUtil.java       |  64 +++----
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 195 ++++++++-------------
 5 files changed, 161 insertions(+), 169 deletions(-)

diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java
index a64f311..f92ce73 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java
@@ -33,6 +33,7 @@ import com.jolbox.bonecp.BoneCPConfig;
 import com.jolbox.bonecp.BoneCPDataSource;
 import com.jolbox.bonecp.StatisticsMBean;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.DatabaseProduct;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.slf4j.Logger;
@@ -54,12 +55,12 @@ public class BoneCPDataSourceProvider implements 
DataSourceProvider {
 
   @Override
   public DataSource create(Configuration hdpConfig) throws SQLException {
-
     LOG.debug("Creating BoneCP connection pool for the MetaStore");
 
     String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig);
     String user = DataSourceProvider.getMetastoreJdbcUser(hdpConfig);
     String passwd = DataSourceProvider.getMetastoreJdbcPasswd(hdpConfig);
+
     int maxPoolSize = MetastoreConf.getIntVar(hdpConfig,
         MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
 
@@ -67,26 +68,38 @@ public class BoneCPDataSourceProvider implements 
DataSourceProvider {
     long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY, 
30000L);
     String partitionCount = properties.getProperty(PARTITION_COUNT_PROPERTY, 
"1");
 
-    BoneCPConfig config = null;
+    BoneCPConfig config;
     try {
       config = new BoneCPConfig(properties);
     } catch (Exception e) {
       throw new SQLException("Cannot create BoneCP configuration: ", e);
     }
     config.setJdbcUrl(driverUrl);
+    config.setUser(user);
+    config.setPassword(passwd);
     // if we are waiting for connection for a long time, something is really 
wrong
     // better raise an error than hang forever
     // see DefaultConnectionStrategy.getConnectionInternal()
     config.setConnectionTimeoutInMs(connectionTimeout);
     config.setMaxConnectionsPerPartition(maxPoolSize);
     config.setPartitionCount(Integer.parseInt(partitionCount));
-    config.setUser(user);
-    config.setPassword(passwd);
 
-    if (determineDatabaseProduct(driverUrl) == MYSQL) {
-      config.setInitSQL("SET @@session.sql_mode=ANSI_QUOTES");
+    Properties connProperties = new Properties();
+
+    DatabaseProduct dbProduct =  determineDatabaseProduct(driverUrl);
+    switch (dbProduct){
+      case MYSQL:
+        connProperties.put("rewriteBatchedStatements", true);
+        break;
+      case POSTGRES:
+        connProperties.put("reWriteBatchedInserts", true);
+        break;
     }
+    config.setDriverProperties(connProperties);
 
+    if (dbProduct == MYSQL) {
+      config.setInitSQL("SET @@session.sql_mode=ANSI_QUOTES");
+    }
     return initMetrics(new BoneCPDataSource(config));
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
index 6a1ed8f..85719fd 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
@@ -17,13 +17,14 @@
  */
 package org.apache.hadoop.hive.metastore.datasource;
 
-import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.dbcp.BasicDataSource;
 import org.apache.commons.dbcp.ConnectionFactory;
-import org.apache.commons.dbcp.DriverManagerConnectionFactory;
+import org.apache.commons.dbcp.DataSourceConnectionFactory;
 import org.apache.commons.dbcp.PoolableConnectionFactory;
 import org.apache.commons.dbcp.PoolingDataSource;
 import org.apache.commons.pool.impl.GenericObjectPool;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.DatabaseProduct;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,12 +57,26 @@ public class DbCPDataSourceProvider implements 
DataSourceProvider {
 
   @Override
   public DataSource create(Configuration hdpConfig) throws SQLException {
-
     LOG.debug("Creating dbcp connection pool for the MetaStore");
 
     String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig);
     String user = DataSourceProvider.getMetastoreJdbcUser(hdpConfig);
     String passwd = DataSourceProvider.getMetastoreJdbcPasswd(hdpConfig);
+
+    BasicDataSource dbcpDs = new BasicDataSource();
+    dbcpDs.setUrl(driverUrl);
+    dbcpDs.setUsername(user);
+    dbcpDs.setPassword(passwd);
+
+    DatabaseProduct dbProduct =  determineDatabaseProduct(driverUrl);
+    switch (dbProduct){
+      case MYSQL:
+        dbcpDs.setConnectionProperties("rewriteBatchedStatements=true");
+        break;
+      case POSTGRES:
+        dbcpDs.setConnectionProperties("reWriteBatchedInserts=true");
+        break;
+    }
     int maxPoolSize = hdpConfig.getInt(
             
MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getVarname(),
             ((Long) 
MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getDefaultVal()).intValue());
@@ -97,17 +112,16 @@ public class DbCPDataSourceProvider implements 
DataSourceProvider {
     
objectPool.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
     objectPool.setLifo(lifo);
 
-    ConnectionFactory connFactory = new 
DriverManagerConnectionFactory(driverUrl, user, passwd);
+    ConnectionFactory connFactory = new DataSourceConnectionFactory(dbcpDs);
     // This doesn't get used, but it's still necessary, see
     // 
https://git1-us-west.apache.org/repos/asf?p=commons-dbcp.git;a=blob;f=doc/ManualPoolingDataSourceExample.java;
     // 
h=f45af2b8481f030b27364e505984c0eef4f35cdb;hb=refs/heads/DBCP_1_5_x_BRANCH
     PoolableConnectionFactory poolableConnFactory =
         new PoolableConnectionFactory(connFactory, objectPool, null, null, 
false, true);
 
-    if (determineDatabaseProduct(driverUrl) == MYSQL) {
+    if (dbProduct == MYSQL) {
       poolableConnFactory.setValidationQuery("SET 
@@session.sql_mode=ANSI_QUOTES");
     }
-
     return new PoolingDataSource(objectPool);
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
index 333610d..76bbf3b 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
@@ -21,6 +21,7 @@ import com.codahale.metrics.MetricRegistry;
 import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.HikariDataSource;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.DatabaseProduct;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.slf4j.Logger;
@@ -30,7 +31,6 @@ import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.util.Properties;
 
-import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL;
 import static 
org.apache.hadoop.hive.metastore.DatabaseProduct.determineDatabaseProduct;
 
 /**
@@ -45,19 +45,20 @@ public class HikariCPDataSourceProvider implements 
DataSourceProvider {
 
   @Override
   public DataSource create(Configuration hdpConfig) throws SQLException {
-
     LOG.debug("Creating Hikari connection pool for the MetaStore");
 
     String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig);
     String user = DataSourceProvider.getMetastoreJdbcUser(hdpConfig);
     String passwd = DataSourceProvider.getMetastoreJdbcPasswd(hdpConfig);
+
     int maxPoolSize = MetastoreConf.getIntVar(hdpConfig,
         MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
 
     Properties properties = replacePrefix(
         DataSourceProvider.getPrefixedProperties(hdpConfig, HIKARI));
     long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY, 
30000L);
-    HikariConfig config = null;
+
+    HikariConfig config;
     try {
       config = new HikariConfig(properties);
     } catch (Exception e) {
@@ -68,12 +69,19 @@ public class HikariCPDataSourceProvider implements 
DataSourceProvider {
     config.setUsername(user);
     config.setPassword(passwd);
 
-    if (determineDatabaseProduct(driverUrl) == MYSQL) {
-      config.setConnectionInitSql("SET @@session.sql_mode=ANSI_QUOTES");
-    }
     //https://github.com/brettwooldridge/HikariCP
     config.setConnectionTimeout(connectionTimeout);
 
+    DatabaseProduct dbProduct =  determineDatabaseProduct(driverUrl);
+    switch (dbProduct){
+      case MYSQL:
+        config.setConnectionInitSql("SET @@session.sql_mode=ANSI_QUOTES");
+        config.addDataSourceProperty("rewriteBatchedStatements", true);
+        break;
+      case POSTGRES:
+        config.addDataSourceProperty("reWriteBatchedInserts", true);
+        break;
+    }
     return new HikariDataSource(initMetrics(config));
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index efcf2e1..ef88240 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -25,17 +25,22 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.SQLTransactionRollbackException;
 import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.EnumMap;
 import java.util.Properties;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.DatabaseProduct;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.zookeeper.txn.TxnHeader;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hive.metastore.DatabaseProduct.*;
+
 /**
  * Utility methods for creating and destroying txn database/schema, plus 
methods for
  * querying against metastore tables.
@@ -43,9 +48,20 @@ import org.slf4j.LoggerFactory;
  */
 public final class TxnDbUtil {
 
-  static final private Logger LOG = 
LoggerFactory.getLogger(TxnDbUtil.class.getName());
+  private static final  Logger LOG = 
LoggerFactory.getLogger(TxnDbUtil.class.getName());
   private static final String TXN_MANAGER = 
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
 
+  private static final EnumMap<DatabaseProduct, String> DB_EPOCH_FN =
+      new EnumMap<DatabaseProduct, String>(DatabaseProduct.class) {{
+        put(DERBY, "{ fn timestampdiff(sql_tsi_frac_second, timestamp('" + new 
Timestamp(0) +
+            "'), current_timestamp) } / 1000000");
+        put(MYSQL, "round(unix_timestamp(now(3)) * 1000)");
+        put(POSTGRES, "round(extract(epoch from current_timestamp) * 1000)");
+        put(ORACLE, "(cast(systimestamp at time zone 'UTC' as date) - date 
'1970-01-01')*24*60*60*1000 " +
+            "+ cast(mod( extract( second from systimestamp ), 1 ) * 1000 as 
int)");
+        put(SQLSERVER, "datediff_big(millisecond, '19700101', 
sysutcdatetime())");
+      }};
+
   private static int deadlockCnt = 0;
 
   private TxnDbUtil() {
@@ -500,35 +516,6 @@ public final class TxnDbUtil {
   }
 
   /**
-   * Return true if the transaction of the given txnId is open.
-   * @param conf    HiveConf
-   * @param txnId   transaction id to search for
-   * @return
-   * @throws Exception
-   */
-  public static boolean isOpenOrAbortedTransaction(Configuration conf, long 
txnId) throws Exception {
-    Connection conn = null;
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    try {
-      conn = getConnection(conf);
-      conn.setAutoCommit(false);
-      conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
-
-      stmt = conn.prepareStatement("SELECT txn_id FROM TXNS WHERE txn_id = ?");
-      stmt.setLong(1, txnId);
-      rs = stmt.executeQuery();
-      if (!rs.next()) {
-        return false;
-      } else {
-        return true;
-      }
-    } finally {
-      closeResources(conn, stmt, rs);
-    }
-  }
-
-  /**
    * Utility method used to run COUNT queries like "select count(*) from ..." 
against metastore tables
    * @param countQuery countQuery text
    * @return count countQuery result
@@ -627,4 +614,19 @@ public final class TxnDbUtil {
       }
     }
   }
+
+  /**
+   * Get database specific function which returns the milliseconds value after 
the epoch.
+   * @throws MetaException For unknown database type.
+   */
+  static String getEpochFn(DatabaseProduct dbProduct) throws MetaException {
+    String epochFn = DB_EPOCH_FN.get(dbProduct);
+    if (epochFn != null) {
+      return epochFn;
+    } else {
+      String msg = "Unknown database product: " + dbProduct.toString();
+      LOG.error(msg);
+      throw new MetaException(msg);
+    }
+  }
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 06defdb..74ef885 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -27,7 +27,6 @@ import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Savepoint;
 import java.sql.Statement;
-import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -178,12 +177,18 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   static final protected char LOCK_SHARED = 'r';
   static final protected char LOCK_SEMI_SHARED = 'w';
 
-  static final private int ALLOWED_REPEATED_DEADLOCKS = 10;
-  static final private Logger LOG = 
LoggerFactory.getLogger(TxnHandler.class.getName());
+  private static final int ALLOWED_REPEATED_DEADLOCKS = 10;
+  private static final Logger LOG = 
LoggerFactory.getLogger(TxnHandler.class.getName());
 
-  static private DataSource connPool;
+  private static DataSource connPool;
   private static DataSource connPoolMutex;
-  static private boolean doRetryOnConnPool = false;
+  private static boolean doRetryOnConnPool = false;
+
+  // Query definitions
+  private static final String HIVE_LOCKS_INSERT_QRY = "INSERT INTO 
\"HIVE_LOCKS\" ( " +
+      "\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", 
\"HL_TABLE\", \"HL_PARTITION\", " +
+      "\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", 
\"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " +
+      "VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)";
 
   private List<TransactionalMetaStoreEventListener> transactionalListeners;
 
@@ -611,11 +616,11 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       params.add(rqst.getUser());
       params.add(rqst.getHostname());
       List<List<String>> paramsList = new ArrayList<>(numTxns);
-      String dbEpochString = getDbEpochString();
+
       for (long i = first; i < first + numTxns; i++) {
         txnIds.add(i);
-        rows.add(i + "," + quoteChar(TXN_OPEN) + "," + dbEpochString + "," + 
dbEpochString + ",?,?,"
-                     + txnType.getValue());
+        rows.add(i + "," + quoteChar(TXN_OPEN) + "," + 
TxnDbUtil.getEpochFn(dbProduct) + ","
+                + TxnDbUtil.getEpochFn(dbProduct) + ",?,?," + 
txnType.getValue());
         paramsList.add(params);
       }
       insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
@@ -2520,73 +2525,59 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           }
           insertPreparedStmts = null;
         }
-        List<String> rows = new ArrayList<>();
-        List<List<String>> paramsList = new ArrayList<>();
+        String lastHB = isValidTxn(txnid) ? "0" : 
TxnDbUtil.getEpochFn(dbProduct);
+        String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB);
+
+        int batchSize = MetastoreConf.getIntVar(conf, 
ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
         long intLockId = 0;
-        String lastHBString = (isValidTxn(txnid) ? "0" : getDbEpochString());
-        for (LockComponent lc : rqst.getComponent()) {
-          if(lc.isSetOperationType() && lc.getOperationType() == 
DataOperationType.UNSET &&
-            (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || 
MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) {
-            //old version of thrift client should have 
(lc.isSetOperationType() == false) but they do not
-            //If you add a default value to a variable, isSet() for that 
variable is true regardless of the where the
-            //message was created (for object variables.
-            // It works correctly for boolean vars, e.g. 
LockComponent.isTransactional).
-            //in test mode, upgrades are not tested, so client version and 
server version of thrift always matches so
-            //we see UNSET here it means something didn't set the appropriate 
value.
-            throw new IllegalStateException("Bug: operationType=" + 
lc.getOperationType() + " for component "
-              + lc + " agentInfo=" + rqst.getAgentInfo());
-          }
-          intLockId++;
-          String dbName = normalizeCase(lc.getDbname());
-          String tblName = normalizeCase(lc.getTablename());
-          String partName = normalizeCase(lc.getPartitionname());
-          LockType lockType = lc.getType();
-          char lockChar = 'z';
-          switch (lockType) {
-            case EXCLUSIVE:
-              lockChar = LOCK_EXCLUSIVE;
-              break;
-            case SHARED_READ:
-              lockChar = LOCK_SHARED;
-              break;
-            case SHARED_WRITE:
-              lockChar = LOCK_SEMI_SHARED;
-              break;
-          }
-          rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " +
-                  ((tblName == null) ? "null" : "?") + ", " +
-                  ((partName == null) ? "null" : "?") + ", " +
-                  quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " +
-                  //for locks associated with a txn, we always heartbeat txn 
and timeout based on that
-                  lastHBString + ", " +
-                  ((rqst.getUser() == null) ? "null" : "?")  + ", " +
-                  ((rqst.getHostname() == null) ? "null" : "?") + ", " +
-                  ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")";
-          List<String> params = new ArrayList<>();
-          params.add(dbName);
-          if (tblName != null) {
-            params.add(tblName);
-          }
-          if (partName != null) {
-            params.add(partName);
-          }
-          if (rqst.getUser() != null) {
-            params.add(rqst.getUser());
-          }
-          if (rqst.getHostname() != null) {
-            params.add(rqst.getHostname());
+
+        try (PreparedStatement pstmt = 
dbConn.prepareStatement(insertLocksQuery)) {
+          for (LockComponent lc : rqst.getComponent()) {
+            if (lc.isSetOperationType() && lc.getOperationType() == 
DataOperationType.UNSET &&
+              (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || 
MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) {
+              //Old version of thrift client should have 
(lc.isSetOperationType() == false), but they do not
+              //If you add a default value to a variable, isSet() for that 
variable is true regardless of the where the
+              //message was created (for object variables).
+              //It works correctly for boolean vars, e.g. 
LockComponent.isTransactional).
+              //in test mode, upgrades are not tested, so client version and 
server version of thrift always matches so
+              //we see UNSET here it means something didn't set the 
appropriate value.
+              throw new IllegalStateException("Bug: operationType=" + 
lc.getOperationType() + " for component "
+                + lc + " agentInfo=" + rqst.getAgentInfo());
+            }
+            intLockId++;
+
+            char lockChar = 'z';
+            switch (lc.getType()) {
+              case EXCLUSIVE:
+                lockChar = LOCK_EXCLUSIVE;
+                break;
+              case SHARED_READ:
+                lockChar = LOCK_SHARED;
+                break;
+              case SHARED_WRITE:
+                lockChar = LOCK_SEMI_SHARED;
+                break;
+            }
+            pstmt.setLong(1, extLockId);
+            pstmt.setLong(2, intLockId);
+            pstmt.setLong(3, txnid);
+            pstmt.setString(4, normalizeCase(lc.getDbname()));
+            pstmt.setString(5, normalizeCase(lc.getTablename()));
+            pstmt.setString(6, normalizeCase(lc.getPartitionname()));
+            pstmt.setString(7, Character.toString(LOCK_WAITING));
+            pstmt.setString(8, Character.toString(lockChar));
+            pstmt.setString(9, rqst.getUser());
+            pstmt.setString(10, rqst.getHostname());
+            pstmt.setString(11, rqst.getAgentInfo());
+
+            pstmt.addBatch();
+            if (intLockId % batchSize == 0) {
+              pstmt.executeBatch();
+            }
           }
-          if (rqst.getAgentInfo() != null) {
-            params.add(rqst.getAgentInfo());
+          if (intLockId % batchSize != 0) {
+            pstmt.executeBatch();
           }
-          paramsList.add(params);
-        }
-        insertPreparedStmts = 
sqlGenerator.createInsertValuesPreparedStmt(dbConn,
-          "\"HIVE_LOCKS\" (\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", 
\"HL_TXNID\", \"HL_DB\", " +
-            "\"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", 
\"HL_LOCK_TYPE\", " +
-            "\"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", 
\"HL_AGENT_INFO\")", rows, paramsList);
-        for(PreparedStatement pst : insertPreparedStmts) {
-          int modCount = pst.executeUpdate();
         }
         dbConn.commit();
         success = true;
@@ -2961,7 +2952,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           txnIds.add(txn);
         }
         TxnUtils.buildQueryWithINClause(conf, queries,
-          new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + 
getDbEpochString() +
+          new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + 
TxnDbUtil.getEpochFn(dbProduct) +
             " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "),
           new StringBuilder(""), txnIds, "\"TXN_ID\"", true, false);
         int updateCnt = 0;
@@ -3926,37 +3917,6 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   }
 
   /**
-   * Returns the database specific query string representation which will 
return the milliseconds
-   * value after epoch.
-   * @return The string which will insert the current timestamp milliseconds 
value
-   * @throws MetaException For unknown database type
-   */
-  private static String epochInCurrentTimezone = null;
-  protected String getDbEpochString() throws MetaException {
-    switch (dbProduct) {
-      case DERBY:
-        if (epochInCurrentTimezone == null) {
-          epochInCurrentTimezone = new Timestamp(0).toString();
-        }
-        return "{ fn timestampdiff(sql_tsi_frac_second, timestamp('" + 
epochInCurrentTimezone +
-          "'), current_timestamp) } / 1000000";
-      case MYSQL:
-        return "round(unix_timestamp(curtime(4)) * 1000)";
-      case POSTGRES:
-        return "round(extract(epoch from current_timestamp) * 1000)";
-      case ORACLE:
-        return "(cast(systimestamp at time zone 'UTC' as date) - date 
'1970-01-01')*24*60*60*1000 " +
-          "+ cast(mod( extract( second from systimestamp ), 1 ) * 1000 as 
int)";
-      case SQLSERVER:
-        return "datediff_big(millisecond, '19700101', sysutcdatetime())";
-      default:
-        String msg = "Unknown database product: " + dbProduct.toString();
-        LOG.error(msg);
-        throw new MetaException(msg);
-    }
-  }
-
-  /**
    * Determine the current time, using the RDBMS as a source of truth
    * @param conn database connection
    * @return current time in milliseconds
@@ -4276,7 +4236,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + 
quoteChar(TXN_ABORTED) +
         " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND ");
       if(checkHeartbeat) {
-        suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < 
").append(getDbEpochString()).append("-").append(timeout);
+        suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ");
+        
suffix.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout);
       }
 
       TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, 
"\"TXN_ID\"", true, false);
@@ -4524,8 +4485,9 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     long extLockId = locksBeingChecked.get(0).extLockId;
     String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_STATE\" = '" + 
LOCK_ACQUIRED + "', " +
       //if lock is part of txn, heartbeat info is in txn record
-      "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : 
getDbEpochString()) +
-      ", \"HL_ACQUIRED_AT\" = " + getDbEpochString() + 
",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" +
+      "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : 
TxnDbUtil.getEpochFn(dbProduct)) +
+      ",\"HL_ACQUIRED_AT\" = " + TxnDbUtil.getEpochFn(dbProduct) +
+      ",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" +
       " WHERE \"HL_LOCK_EXT_ID\" = " +  extLockId;
     LOG.debug("Going to execute update <" + s + ">");
     int rc = stmt.executeUpdate(s);
@@ -4570,7 +4532,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       stmt = dbConn.createStatement();
 
       String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = " +
-        getDbEpochString() + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId;
+          TxnDbUtil.getEpochFn(dbProduct) + " WHERE \"HL_LOCK_EXT_ID\" = " + 
extLockId;
       LOG.debug("Going to execute update <" + s + ">");
       int rc = stmt.executeUpdate(s);
       if (rc < 1) {
@@ -4593,7 +4555,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     Statement stmt = null;
     try {
       stmt = dbConn.createStatement();
-      String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + 
getDbEpochString() +
+      String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + 
TxnDbUtil.getEpochFn(dbProduct) +
         " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + 
"'";
       LOG.debug("Going to execute update <" + s + ">");
       int rc = stmt.executeUpdate(s);
@@ -4849,8 +4811,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       stmt = dbConn.createStatement();
       //doing a SELECT first is less efficient but makes it easier to debug 
things
       String s = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" WHERE 
\"HL_LAST_HEARTBEAT\" < " +
-          getDbEpochString() + "-" + timeout + " AND \"HL_TXNID\" = 0"; //when 
txnid is <> 0, the lock is
-      //associated with a txn and is handled by performTimeOuts()
+          TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout + " AND \"HL_TXNID\" 
= 0";
+      //when txnid is <> 0, the lock is associated with a txn and is handled 
by performTimeOuts()
       //want to avoid expiring locks for a txn w/o expiring the txn itself
       List<Long> extLockIDs = new ArrayList<>();
       rs = stmt.executeQuery(s);
@@ -4870,7 +4832,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
 
       //include same hl_last_heartbeat condition in case someone heartbeated 
since the select
       prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < 
");
-      prefix.append(getDbEpochString()).append("-").append(timeout);
+      
prefix.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout);
       prefix.append(" AND \"HL_TXNID\" = 0 AND ");
 
       TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, 
extLockIDs, "\"HL_LOCK_EXT_ID\"", true, false);
@@ -4927,7 +4889,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       while(true) {
         stmt = dbConn.createStatement();
         String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + 
TXN_OPEN +
-            "' AND \"TXN_LAST_HEARTBEAT\" <  " + getDbEpochString() + "-" + 
timeout +
+            "' AND \"TXN_LAST_HEARTBEAT\" <  " + 
TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout +
             " AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue();
         //safety valve for extreme cases
         s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, 
s);
@@ -5099,13 +5061,6 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   private static String getMessage(SQLException ex) {
     return ex.getMessage() + " (SQLState=" + ex.getSQLState() + ", ErrorCode=" 
+ ex.getErrorCode() + ")";
   }
-  /**
-   * Useful for building SQL strings
-   * @param value may be {@code null}
-   */
-  private static String valueOrNullLiteral(String value) {
-    return value == null ? "null" : quoteString(value);
-  }
   static String quoteString(String input) {
     return "'" + input + "'";
   }

Reply via email to