This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new acb1bd1 HIVE-23032: Add batching in Lock generation (Denys Kuzmenko, reviewed by Peter Vary) acb1bd1 is described below commit acb1bd11abedad910a2b2ed04d91e461fda2de42 Author: Denys Kuzmenko <dkuzme...@apache.org> AuthorDate: Mon Mar 30 09:28:57 2020 +0200 HIVE-23032: Add batching in Lock generation (Denys Kuzmenko, reviewed by Peter Vary) --- .../datasource/BoneCPDataSourceProvider.java | 25 ++- .../datasource/DbCPDataSourceProvider.java | 26 ++- .../datasource/HikariCPDataSourceProvider.java | 20 ++- .../hadoop/hive/metastore/txn/TxnDbUtil.java | 64 +++---- .../hadoop/hive/metastore/txn/TxnHandler.java | 195 ++++++++------------- 5 files changed, 161 insertions(+), 169 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java index a64f311..f92ce73 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java @@ -33,6 +33,7 @@ import com.jolbox.bonecp.BoneCPConfig; import com.jolbox.bonecp.BoneCPDataSource; import com.jolbox.bonecp.StatisticsMBean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.slf4j.Logger; @@ -54,12 +55,12 @@ public class BoneCPDataSourceProvider implements DataSourceProvider { @Override public DataSource create(Configuration hdpConfig) throws SQLException { - LOG.debug("Creating BoneCP connection pool for the MetaStore"); String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig); String user = DataSourceProvider.getMetastoreJdbcUser(hdpConfig); String passwd = DataSourceProvider.getMetastoreJdbcPasswd(hdpConfig); + int maxPoolSize = MetastoreConf.getIntVar(hdpConfig, MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS); @@ -67,26 +68,38 @@ public class BoneCPDataSourceProvider implements DataSourceProvider { long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY, 30000L); String partitionCount = properties.getProperty(PARTITION_COUNT_PROPERTY, "1"); - BoneCPConfig config = null; + BoneCPConfig config; try { config = new BoneCPConfig(properties); } catch (Exception e) { throw new SQLException("Cannot create BoneCP configuration: ", e); } config.setJdbcUrl(driverUrl); + config.setUser(user); + config.setPassword(passwd); // if we are waiting for connection for a long time, something is really wrong // better raise an error than hang forever // see DefaultConnectionStrategy.getConnectionInternal() config.setConnectionTimeoutInMs(connectionTimeout); config.setMaxConnectionsPerPartition(maxPoolSize); config.setPartitionCount(Integer.parseInt(partitionCount)); - config.setUser(user); - config.setPassword(passwd); - if (determineDatabaseProduct(driverUrl) == MYSQL) { - config.setInitSQL("SET @@session.sql_mode=ANSI_QUOTES"); + Properties connProperties = new Properties(); + + DatabaseProduct dbProduct = determineDatabaseProduct(driverUrl); + switch (dbProduct){ + case MYSQL: + connProperties.put("rewriteBatchedStatements", true); + break; + case POSTGRES: + connProperties.put("reWriteBatchedInserts", true); + break; } + config.setDriverProperties(connProperties); + if (dbProduct == MYSQL) { + config.setInitSQL("SET @@session.sql_mode=ANSI_QUOTES"); + } return initMetrics(new BoneCPDataSource(config)); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java index 6a1ed8f..85719fd 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java @@ -17,13 +17,14 @@ */ package org.apache.hadoop.hive.metastore.datasource; -import com.codahale.metrics.MetricRegistry; +import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.dbcp.ConnectionFactory; -import org.apache.commons.dbcp.DriverManagerConnectionFactory; +import org.apache.commons.dbcp.DataSourceConnectionFactory; import org.apache.commons.dbcp.PoolableConnectionFactory; import org.apache.commons.dbcp.PoolingDataSource; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,12 +57,26 @@ public class DbCPDataSourceProvider implements DataSourceProvider { @Override public DataSource create(Configuration hdpConfig) throws SQLException { - LOG.debug("Creating dbcp connection pool for the MetaStore"); String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig); String user = DataSourceProvider.getMetastoreJdbcUser(hdpConfig); String passwd = DataSourceProvider.getMetastoreJdbcPasswd(hdpConfig); + + BasicDataSource dbcpDs = new BasicDataSource(); + dbcpDs.setUrl(driverUrl); + dbcpDs.setUsername(user); + dbcpDs.setPassword(passwd); + + DatabaseProduct dbProduct = determineDatabaseProduct(driverUrl); + switch (dbProduct){ + case MYSQL: + dbcpDs.setConnectionProperties("rewriteBatchedStatements=true"); + break; + case POSTGRES: + dbcpDs.setConnectionProperties("reWriteBatchedInserts=true"); + break; + } int maxPoolSize = hdpConfig.getInt( MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getVarname(), ((Long) MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getDefaultVal()).intValue()); @@ -97,17 +112,16 @@ public class DbCPDataSourceProvider implements DataSourceProvider { objectPool.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis); objectPool.setLifo(lifo); - ConnectionFactory connFactory = new DriverManagerConnectionFactory(driverUrl, user, passwd); + ConnectionFactory connFactory = new DataSourceConnectionFactory(dbcpDs); // This doesn't get used, but it's still necessary, see // https://git1-us-west.apache.org/repos/asf?p=commons-dbcp.git;a=blob;f=doc/ManualPoolingDataSourceExample.java; // h=f45af2b8481f030b27364e505984c0eef4f35cdb;hb=refs/heads/DBCP_1_5_x_BRANCH PoolableConnectionFactory poolableConnFactory = new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true); - if (determineDatabaseProduct(driverUrl) == MYSQL) { + if (dbProduct == MYSQL) { poolableConnFactory.setValidationQuery("SET @@session.sql_mode=ANSI_QUOTES"); } - return new PoolingDataSource(objectPool); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java index 333610d..76bbf3b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java @@ -21,6 +21,7 @@ import com.codahale.metrics.MetricRegistry; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.slf4j.Logger; @@ -30,7 +31,6 @@ import javax.sql.DataSource; import java.sql.SQLException; import java.util.Properties; -import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL; import static org.apache.hadoop.hive.metastore.DatabaseProduct.determineDatabaseProduct; /** @@ -45,19 +45,20 @@ public class HikariCPDataSourceProvider implements DataSourceProvider { @Override public DataSource create(Configuration hdpConfig) throws SQLException { - LOG.debug("Creating Hikari connection pool for the MetaStore"); String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig); String user = DataSourceProvider.getMetastoreJdbcUser(hdpConfig); String passwd = DataSourceProvider.getMetastoreJdbcPasswd(hdpConfig); + int maxPoolSize = MetastoreConf.getIntVar(hdpConfig, MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS); Properties properties = replacePrefix( DataSourceProvider.getPrefixedProperties(hdpConfig, HIKARI)); long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY, 30000L); - HikariConfig config = null; + + HikariConfig config; try { config = new HikariConfig(properties); } catch (Exception e) { @@ -68,12 +69,19 @@ public class HikariCPDataSourceProvider implements DataSourceProvider { config.setUsername(user); config.setPassword(passwd); - if (determineDatabaseProduct(driverUrl) == MYSQL) { - config.setConnectionInitSql("SET @@session.sql_mode=ANSI_QUOTES"); - } //https://github.com/brettwooldridge/HikariCP config.setConnectionTimeout(connectionTimeout); + DatabaseProduct dbProduct = determineDatabaseProduct(driverUrl); + switch (dbProduct){ + case MYSQL: + config.setConnectionInitSql("SET @@session.sql_mode=ANSI_QUOTES"); + config.addDataSourceProperty("rewriteBatchedStatements", true); + break; + case POSTGRES: + config.addDataSourceProperty("reWriteBatchedInserts", true); + break; + } return new HikariDataSource(initMetrics(config)); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index efcf2e1..ef88240 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -25,17 +25,22 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.SQLTransactionRollbackException; import java.sql.Statement; +import java.sql.Timestamp; +import java.util.EnumMap; import java.util.Properties; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; -import org.apache.zookeeper.txn.TxnHeader; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hive.metastore.DatabaseProduct.*; + /** * Utility methods for creating and destroying txn database/schema, plus methods for * querying against metastore tables. @@ -43,9 +48,20 @@ import org.slf4j.LoggerFactory; */ public final class TxnDbUtil { - static final private Logger LOG = LoggerFactory.getLogger(TxnDbUtil.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(TxnDbUtil.class.getName()); private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; + private static final EnumMap<DatabaseProduct, String> DB_EPOCH_FN = + new EnumMap<DatabaseProduct, String>(DatabaseProduct.class) {{ + put(DERBY, "{ fn timestampdiff(sql_tsi_frac_second, timestamp('" + new Timestamp(0) + + "'), current_timestamp) } / 1000000"); + put(MYSQL, "round(unix_timestamp(now(3)) * 1000)"); + put(POSTGRES, "round(extract(epoch from current_timestamp) * 1000)"); + put(ORACLE, "(cast(systimestamp at time zone 'UTC' as date) - date '1970-01-01')*24*60*60*1000 " + + "+ cast(mod( extract( second from systimestamp ), 1 ) * 1000 as int)"); + put(SQLSERVER, "datediff_big(millisecond, '19700101', sysutcdatetime())"); + }}; + private static int deadlockCnt = 0; private TxnDbUtil() { @@ -500,35 +516,6 @@ public final class TxnDbUtil { } /** - * Return true if the transaction of the given txnId is open. - * @param conf HiveConf - * @param txnId transaction id to search for - * @return - * @throws Exception - */ - public static boolean isOpenOrAbortedTransaction(Configuration conf, long txnId) throws Exception { - Connection conn = null; - PreparedStatement stmt = null; - ResultSet rs = null; - try { - conn = getConnection(conf); - conn.setAutoCommit(false); - conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); - - stmt = conn.prepareStatement("SELECT txn_id FROM TXNS WHERE txn_id = ?"); - stmt.setLong(1, txnId); - rs = stmt.executeQuery(); - if (!rs.next()) { - return false; - } else { - return true; - } - } finally { - closeResources(conn, stmt, rs); - } - } - - /** * Utility method used to run COUNT queries like "select count(*) from ..." against metastore tables * @param countQuery countQuery text * @return count countQuery result @@ -627,4 +614,19 @@ public final class TxnDbUtil { } } } + + /** + * Get database specific function which returns the milliseconds value after the epoch. + * @throws MetaException For unknown database type. + */ + static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { + String epochFn = DB_EPOCH_FN.get(dbProduct); + if (epochFn != null) { + return epochFn; + } else { + String msg = "Unknown database product: " + dbProduct.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 06defdb..74ef885 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -27,7 +27,6 @@ import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.Savepoint; import java.sql.Statement; -import java.sql.Timestamp; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -178,12 +177,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { static final protected char LOCK_SHARED = 'r'; static final protected char LOCK_SEMI_SHARED = 'w'; - static final private int ALLOWED_REPEATED_DEADLOCKS = 10; - static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); + private static final int ALLOWED_REPEATED_DEADLOCKS = 10; + private static final Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); - static private DataSource connPool; + private static DataSource connPool; private static DataSource connPoolMutex; - static private boolean doRetryOnConnPool = false; + private static boolean doRetryOnConnPool = false; + + // Query definitions + private static final String HIVE_LOCKS_INSERT_QRY = "INSERT INTO \"HIVE_LOCKS\" ( " + + "\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", " + + "\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)"; private List<TransactionalMetaStoreEventListener> transactionalListeners; @@ -611,11 +616,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { params.add(rqst.getUser()); params.add(rqst.getHostname()); List<List<String>> paramsList = new ArrayList<>(numTxns); - String dbEpochString = getDbEpochString(); + for (long i = first; i < first + numTxns; i++) { txnIds.add(i); - rows.add(i + "," + quoteChar(TXN_OPEN) + "," + dbEpochString + "," + dbEpochString + ",?,?," - + txnType.getValue()); + rows.add(i + "," + quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," + + TxnDbUtil.getEpochFn(dbProduct) + ",?,?," + txnType.getValue()); paramsList.add(params); } insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, @@ -2520,73 +2525,59 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } insertPreparedStmts = null; } - List<String> rows = new ArrayList<>(); - List<List<String>> paramsList = new ArrayList<>(); + String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct); + String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB); + + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); long intLockId = 0; - String lastHBString = (isValidTxn(txnid) ? "0" : getDbEpochString()); - for (LockComponent lc : rqst.getComponent()) { - if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && - (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { - //old version of thrift client should have (lc.isSetOperationType() == false) but they do not - //If you add a default value to a variable, isSet() for that variable is true regardless of the where the - //message was created (for object variables. - // It works correctly for boolean vars, e.g. LockComponent.isTransactional). - //in test mode, upgrades are not tested, so client version and server version of thrift always matches so - //we see UNSET here it means something didn't set the appropriate value. - throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " - + lc + " agentInfo=" + rqst.getAgentInfo()); - } - intLockId++; - String dbName = normalizeCase(lc.getDbname()); - String tblName = normalizeCase(lc.getTablename()); - String partName = normalizeCase(lc.getPartitionname()); - LockType lockType = lc.getType(); - char lockChar = 'z'; - switch (lockType) { - case EXCLUSIVE: - lockChar = LOCK_EXCLUSIVE; - break; - case SHARED_READ: - lockChar = LOCK_SHARED; - break; - case SHARED_WRITE: - lockChar = LOCK_SEMI_SHARED; - break; - } - rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " + - ((tblName == null) ? "null" : "?") + ", " + - ((partName == null) ? "null" : "?") + ", " + - quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + - //for locks associated with a txn, we always heartbeat txn and timeout based on that - lastHBString + ", " + - ((rqst.getUser() == null) ? "null" : "?") + ", " + - ((rqst.getHostname() == null) ? "null" : "?") + ", " + - ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")"; - List<String> params = new ArrayList<>(); - params.add(dbName); - if (tblName != null) { - params.add(tblName); - } - if (partName != null) { - params.add(partName); - } - if (rqst.getUser() != null) { - params.add(rqst.getUser()); - } - if (rqst.getHostname() != null) { - params.add(rqst.getHostname()); + + try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) { + for (LockComponent lc : rqst.getComponent()) { + if (lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && + (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { + //Old version of thrift client should have (lc.isSetOperationType() == false), but they do not + //If you add a default value to a variable, isSet() for that variable is true regardless of the where the + //message was created (for object variables). + //It works correctly for boolean vars, e.g. LockComponent.isTransactional). + //in test mode, upgrades are not tested, so client version and server version of thrift always matches so + //we see UNSET here it means something didn't set the appropriate value. + throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " + + lc + " agentInfo=" + rqst.getAgentInfo()); + } + intLockId++; + + char lockChar = 'z'; + switch (lc.getType()) { + case EXCLUSIVE: + lockChar = LOCK_EXCLUSIVE; + break; + case SHARED_READ: + lockChar = LOCK_SHARED; + break; + case SHARED_WRITE: + lockChar = LOCK_SEMI_SHARED; + break; + } + pstmt.setLong(1, extLockId); + pstmt.setLong(2, intLockId); + pstmt.setLong(3, txnid); + pstmt.setString(4, normalizeCase(lc.getDbname())); + pstmt.setString(5, normalizeCase(lc.getTablename())); + pstmt.setString(6, normalizeCase(lc.getPartitionname())); + pstmt.setString(7, Character.toString(LOCK_WAITING)); + pstmt.setString(8, Character.toString(lockChar)); + pstmt.setString(9, rqst.getUser()); + pstmt.setString(10, rqst.getHostname()); + pstmt.setString(11, rqst.getAgentInfo()); + + pstmt.addBatch(); + if (intLockId % batchSize == 0) { + pstmt.executeBatch(); + } } - if (rqst.getAgentInfo() != null) { - params.add(rqst.getAgentInfo()); + if (intLockId % batchSize != 0) { + pstmt.executeBatch(); } - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"HIVE_LOCKS\" (\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", " + - "\"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", " + - "\"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\")", rows, paramsList); - for(PreparedStatement pst : insertPreparedStmts) { - int modCount = pst.executeUpdate(); } dbConn.commit(); success = true; @@ -2961,7 +2952,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { txnIds.add(txn); } TxnUtils.buildQueryWithINClause(conf, queries, - new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbEpochString() + + new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) + " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "), new StringBuilder(""), txnIds, "\"TXN_ID\"", true, false); int updateCnt = 0; @@ -3926,37 +3917,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } /** - * Returns the database specific query string representation which will return the milliseconds - * value after epoch. - * @return The string which will insert the current timestamp milliseconds value - * @throws MetaException For unknown database type - */ - private static String epochInCurrentTimezone = null; - protected String getDbEpochString() throws MetaException { - switch (dbProduct) { - case DERBY: - if (epochInCurrentTimezone == null) { - epochInCurrentTimezone = new Timestamp(0).toString(); - } - return "{ fn timestampdiff(sql_tsi_frac_second, timestamp('" + epochInCurrentTimezone + - "'), current_timestamp) } / 1000000"; - case MYSQL: - return "round(unix_timestamp(curtime(4)) * 1000)"; - case POSTGRES: - return "round(extract(epoch from current_timestamp) * 1000)"; - case ORACLE: - return "(cast(systimestamp at time zone 'UTC' as date) - date '1970-01-01')*24*60*60*1000 " + - "+ cast(mod( extract( second from systimestamp ), 1 ) * 1000 as int)"; - case SQLSERVER: - return "datediff_big(millisecond, '19700101', sysutcdatetime())"; - default: - String msg = "Unknown database product: " + dbProduct.toString(); - LOG.error(msg); - throw new MetaException(msg); - } - } - - /** * Determine the current time, using the RDBMS as a source of truth * @param conn database connection * @return current time in milliseconds @@ -4276,7 +4236,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) + " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "); if(checkHeartbeat) { - suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ").append(getDbEpochString()).append("-").append(timeout); + suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < "); + suffix.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout); } TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false); @@ -4524,8 +4485,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { long extLockId = locksBeingChecked.get(0).extLockId; String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_STATE\" = '" + LOCK_ACQUIRED + "', " + //if lock is part of txn, heartbeat info is in txn record - "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : getDbEpochString()) + - ", \"HL_ACQUIRED_AT\" = " + getDbEpochString() + ",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" + + "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : TxnDbUtil.getEpochFn(dbProduct)) + + ",\"HL_ACQUIRED_AT\" = " + TxnDbUtil.getEpochFn(dbProduct) + + ",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); @@ -4570,7 +4532,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { stmt = dbConn.createStatement(); String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = " + - getDbEpochString() + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; + TxnDbUtil.getEpochFn(dbProduct) + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -4593,7 +4555,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { Statement stmt = null; try { stmt = dbConn.createStatement(); - String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbEpochString() + + String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) + " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + "'"; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); @@ -4849,8 +4811,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { stmt = dbConn.createStatement(); //doing a SELECT first is less efficient but makes it easier to debug things String s = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < " + - getDbEpochString() + "-" + timeout + " AND \"HL_TXNID\" = 0"; //when txnid is <> 0, the lock is - //associated with a txn and is handled by performTimeOuts() + TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout + " AND \"HL_TXNID\" = 0"; + //when txnid is <> 0, the lock is associated with a txn and is handled by performTimeOuts() //want to avoid expiring locks for a txn w/o expiring the txn itself List<Long> extLockIDs = new ArrayList<>(); rs = stmt.executeQuery(s); @@ -4870,7 +4832,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { //include same hl_last_heartbeat condition in case someone heartbeated since the select prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < "); - prefix.append(getDbEpochString()).append("-").append(timeout); + prefix.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout); prefix.append(" AND \"HL_TXNID\" = 0 AND "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "\"HL_LOCK_EXT_ID\"", true, false); @@ -4927,7 +4889,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { while(true) { stmt = dbConn.createStatement(); String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN + - "' AND \"TXN_LAST_HEARTBEAT\" < " + getDbEpochString() + "-" + timeout + + "' AND \"TXN_LAST_HEARTBEAT\" < " + TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout + " AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue(); //safety valve for extreme cases s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s); @@ -5099,13 +5061,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { private static String getMessage(SQLException ex) { return ex.getMessage() + " (SQLState=" + ex.getSQLState() + ", ErrorCode=" + ex.getErrorCode() + ")"; } - /** - * Useful for building SQL strings - * @param value may be {@code null} - */ - private static String valueOrNullLiteral(String value) { - return value == null ? "null" : quoteString(value); - } static String quoteString(String input) { return "'" + input + "'"; }