Repository: hive
Updated Branches:
  refs/heads/master 307bbca96 -> 6137ee5dd


HIVE-20607: TxnHandler should use PreparedStatement to execute direct SQL 
queries (Sankar Hariappan, reviewed by Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6137ee5d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6137ee5d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6137ee5d

Branch: refs/heads/master
Commit: 6137ee5dd23f6a0317489248bfd2a2a97d20a04c
Parents: 307bbca
Author: Sankar Hariappan <[email protected]>
Authored: Tue Sep 25 23:56:51 2018 +0530
Committer: Sankar Hariappan <[email protected]>
Committed: Tue Sep 25 23:56:51 2018 +0530

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |  44 +-
 .../hive/ql/lockmgr/TestDbTxnManager.java       |   4 +-
 .../hive/metastore/tools/SQLGenerator.java      | 110 +++-
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 575 ++++++++++++-------
 4 files changed, 491 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6137ee5d/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
 
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 369d9a4..b287d43 100644
--- 
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ 
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -23,7 +23,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -748,11 +748,14 @@ public class DbNotificationListener extends 
TransactionalMetaStoreEventListener
 
       String s = sqlGenerator.addForUpdateClause("select \"WNL_FILES\", 
\"WNL_ID\" from" +
                       " \"TXN_WRITE_NOTIFICATION_LOG\" " +
-                      "where \"WNL_DATABASE\" = " + quoteString(dbName) +
-                      "and \"WNL_TABLE\" = " + quoteString(tblName) +  " and 
\"WNL_PARTITION\" = " +
-                      quoteString(partition) + " and \"WNL_TXNID\" = " + 
Long.toString(acidWriteEvent.getTxnId()));
-      LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
+                      "where \"WNL_DATABASE\" = ? " +
+                      "and \"WNL_TABLE\" = ? " +  " and \"WNL_PARTITION\" = ? 
" +
+                      "and \"WNL_TXNID\" = " + 
Long.toString(acidWriteEvent.getTxnId()));
+      List<String> params = Arrays.asList(dbName, tblName, partition);
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+      LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
+              quoteString(dbName), quoteString(tblName), 
quoteString(partition));
+      rs = pst.executeQuery();
       if (!rs.next()) {
         // if rs is empty then no lock is taken and thus it can not cause 
deadlock.
         long nextNLId = getNextNLId(stmt, sqlGenerator,
@@ -761,6 +764,7 @@ public class DbNotificationListener extends 
TransactionalMetaStoreEventListener
                 "(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", 
\"WNL_DATABASE\", \"WNL_TABLE\", " +
                 "\"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", 
" +
                 "\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES 
(?,?,?,?,?,?,?,?,?,?)";
+        closeStmt(pst);
         int currentTime = now();
         pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s));
         pst.setLong(1, nextNLId);
@@ -793,6 +797,7 @@ public class DbNotificationListener extends 
TransactionalMetaStoreEventListener
                 " \"WNL_FILES\" = ? ," +
                 " \"WNL_EVENT_TIME\" = ?" +
                 " where \"WNL_ID\" = ?";
+        closeStmt(pst);
         pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s));
         pst.setString(1, tableObj);
         pst.setString(2, partitionObj);
@@ -826,6 +831,7 @@ public class DbNotificationListener extends 
TransactionalMetaStoreEventListener
       return;
     }
     Statement stmt = null;
+    PreparedStatement pst = null;
     ResultSet rs = null;
     try {
       stmt = dbConn.createStatement();
@@ -852,21 +858,20 @@ public class DbNotificationListener extends 
TransactionalMetaStoreEventListener
       long nextNLId = getNextNLId(stmt, sqlGenerator,
               "org.apache.hadoop.hive.metastore.model.MNotificationLog");
 
-      List<String> insert = new ArrayList<>();
+      String insertVal = "(" + nextNLId + "," + nextEventId + "," + now() + ", 
?, ?," +
+              quoteString(" ") + ",?, ?)";
 
-      insert.add(0, nextNLId + "," + nextEventId + "," + now() + "," +
-              quoteString(event.getEventType()) + "," + 
quoteString(event.getDbName()) + "," +
-              quoteString(" ") + "," + quoteString(event.getMessage()) + "," +
-              quoteString(event.getMessageFormat()));
+      s = "insert into \"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", 
\"EVENT_TIME\", " +
+              " \"EVENT_TYPE\", \"DB_NAME\", " +
+              " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\") VALUES " + 
insertVal;
+      List<String> params = Arrays.asList(
+              event.getEventType(), event.getDbName(), event.getMessage(), 
event.getMessageFormat());
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
 
-      List<String> sql = sqlGenerator.createInsertValuesStmt(
-              "\"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", 
" +
-                      " \"EVENT_TYPE\", \"DB_NAME\"," +
-                      " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\")", 
insert);
-      for (String q : sql) {
-        LOG.info("Going to execute insert <" + q + ">");
-        stmt.execute(q);
-      }
+      LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">",
+              quoteString(event.getEventType()), 
quoteString(event.getDbName()),
+              quoteString(event.getMessage()), 
quoteString(event.getMessageFormat()));
+      pst.execute();
 
       // Set the DB_NOTIFICATION_EVENT_ID for future reference by other 
listeners.
       if (event.isSetEventId()) {
@@ -879,6 +884,7 @@ public class DbNotificationListener extends 
TransactionalMetaStoreEventListener
       throw e;
     } finally {
       closeStmt(stmt);
+      closeStmt(pst);
       close(rs);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6137ee5d/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 2576ba2..cc86afe 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -98,8 +98,8 @@ public class TestDbTxnManager {
   public void testSingleReadPartition() throws Exception {
     addPartitionInput(newTable(true));
     QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
-    txnMgr.openTxn(ctx, null);
-    txnMgr.acquireLocks(qp, ctx, null);
+    txnMgr.openTxn(ctx, "fred");
+    txnMgr.acquireLocks(qp, ctx, "fred");
     List<HiveLock> locks = ctx.getHiveLocks();
     Assert.assertEquals(1, locks.size());
     Assert.assertEquals(1,

http://git-wip-us.apache.org/repos/asf/hive/blob/6137ee5d/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
index d0ac7db..49b737e 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
@@ -26,6 +26,9 @@ import 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -47,18 +50,80 @@ public final class SQLGenerator {
   }
 
   /**
-   * Genereates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for 
appropriate DB
+   * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for 
appropriate DB
+   *
+   * @param tblColumns   e.g. "T(a,b,c)"
+   * @param rows         e.g. list of Strings like 3,4,'d'
+   * @param paramsList   List of parameters which in turn is list of Strings 
to be set in PreparedStatement object
+   * @return List PreparedStatement objects for fully formed INSERT INTO ... 
statements
+   */
+  public List<PreparedStatement> createInsertValuesPreparedStmt(Connection 
dbConn,
+                                                                String 
tblColumns, List<String> rows,
+                                                                
List<List<String>> paramsList)
+          throws SQLException {
+    if (rows == null || rows.size() == 0) {
+      return Collections.emptyList();
+    }
+    assert((paramsList == null) || (rows.size() == paramsList.size()));
+
+    List<Integer> rowsCountInStmts = new ArrayList<>();
+    List<String> insertStmts = createInsertValuesStmt(tblColumns, rows, 
rowsCountInStmts);
+    assert(insertStmts.size() == rowsCountInStmts.size());
+
+    List<PreparedStatement> preparedStmts = new ArrayList<>();
+    int paramsListFromIdx = 0;
+    try {
+      for (int stmtIdx = 0; stmtIdx < insertStmts.size(); stmtIdx++) {
+        String sql = insertStmts.get(stmtIdx);
+        PreparedStatement pStmt = prepareStmtWithParameters(dbConn, sql, null);
+        if (paramsList != null) {
+          int paramIdx = 1;
+          int paramsListToIdx = paramsListFromIdx + 
rowsCountInStmts.get(stmtIdx);
+          for (int paramsListIdx = paramsListFromIdx; paramsListIdx < 
paramsListToIdx; paramsListIdx++) {
+            List<String> params = paramsList.get(paramsListIdx);
+            for (int i = 0; i < params.size(); i++, paramIdx++) {
+              pStmt.setString(paramIdx, params.get(i));
+            }
+          }
+          paramsListFromIdx = paramsListToIdx;
+        }
+        preparedStmts.add(pStmt);
+      }
+    } catch (SQLException e) {
+      for (PreparedStatement pst : preparedStmts) {
+        pst.close();
+      }
+      throw e;
+    }
+    return preparedStmts;
+  }
+
+  /**
+   * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for 
appropriate DB
    *
    * @param tblColumns e.g. "T(a,b,c)"
    * @param rows       e.g. list of Strings like 3,4,'d'
    * @return fully formed INSERT INTO ... statements
    */
   public List<String> createInsertValuesStmt(String tblColumns, List<String> 
rows) {
+    return createInsertValuesStmt(tblColumns, rows, null);
+  }
+
+  /**
+   * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for 
appropriate DB
+   *
+   * @param tblColumns e.g. "T(a,b,c)"
+   * @param rows       e.g. list of Strings like 3,4,'d'
+   * @param rowsCountInStmts Output the number of rows in each insert 
statement returned.
+   * @return fully formed INSERT INTO ... statements
+   */
+  private List<String> createInsertValuesStmt(String tblColumns, List<String> 
rows, List<Integer> rowsCountInStmts) {
     if (rows == null || rows.size() == 0) {
       return Collections.emptyList();
     }
     List<String> insertStmts = new ArrayList<>();
     StringBuilder sb = new StringBuilder();
+    int numRowsInCurrentStmt = 0;
     switch (dbProduct) {
     case ORACLE:
       if (rows.size() > 1) {
@@ -69,15 +134,23 @@ public final class SQLGenerator {
             if (numRows > 0) {
               sb.append(" select * from dual");
               insertStmts.add(sb.toString());
+              if (rowsCountInStmts != null) {
+                rowsCountInStmts.add(numRowsInCurrentStmt);
+              }
+              numRowsInCurrentStmt = 0;
             }
             sb.setLength(0);
             sb.append("insert all ");
           }
           sb.append("into ").append(tblColumns).append(" 
values(").append(rows.get(numRows))
               .append(") ");
+          numRowsInCurrentStmt++;
         }
         sb.append("select * from dual");
         insertStmts.add(sb.toString());
+        if (rowsCountInStmts != null) {
+          rowsCountInStmts.add(numRowsInCurrentStmt);
+        }
         return insertStmts;
       }
       //fall through
@@ -89,13 +162,21 @@ public final class SQLGenerator {
         if (numRows % MetastoreConf.getIntVar(conf, 
ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) {
           if (numRows > 0) {
             insertStmts.add(sb.substring(0, sb.length() - 1));//exclude 
trailing comma
+            if (rowsCountInStmts != null) {
+              rowsCountInStmts.add(numRowsInCurrentStmt);
+            }
+            numRowsInCurrentStmt = 0;
           }
           sb.setLength(0);
           sb.append("insert into ").append(tblColumns).append(" values");
         }
         sb.append('(').append(rows.get(numRows)).append("),");
+        numRowsInCurrentStmt++;
       }
       insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing 
comma
+      if (rowsCountInStmts != null) {
+        rowsCountInStmts.add(numRowsInCurrentStmt);
+      }
       return insertStmts;
     default:
       String msg = "Unrecognized database product name <" + dbProduct + ">";
@@ -171,6 +252,33 @@ public final class SQLGenerator {
     }
   }
 
+  /**
+   * Make PreparedStatement object with list of String type parameters to be 
set.
+   * It is assumed the input sql string have the number of "?" equal to number 
of parameters
+   * passed as input.
+   * @param dbConn - Connection object
+   * @param sql - SQL statement with "?" for input parameters.
+   * @param parameters - List of String type parameters to be set in 
PreparedStatement object
+   * @return PreparedStatement type object
+   * @throws SQLException
+   */
+  public PreparedStatement prepareStmtWithParameters(Connection dbConn, String 
sql, List<String> parameters)
+          throws SQLException {
+    PreparedStatement pst = dbConn.prepareStatement(addEscapeCharacters(sql));
+    if ((parameters == null) || parameters.isEmpty()) {
+      return pst;
+    }
+    try {
+      for (int i = 1; i <= parameters.size(); i++) {
+        pst.setString(i, parameters.get(i - 1));
+      }
+    } catch (SQLException e) {
+      pst.close();
+      throw e;
+    }
+    return pst;
+  }
+
   public DatabaseProduct getDbProduct() {
     return dbProduct;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6137ee5d/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index d76049e..1df1ebc 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -21,6 +21,7 @@ import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.sql.Connection;
 import java.sql.Driver;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
@@ -570,10 +571,11 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           throws SQLException, MetaException {
     int numTxns = rqst.getNum_txns();
     ResultSet rs = null;
+    List<PreparedStatement> insertPreparedStmts = null;
     TxnType txnType = TxnType.DEFAULT;
     try {
       if (rqst.isSetReplPolicy()) {
-        List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), 
rqst.getReplSrcTxnIds(), stmt);
+        List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), 
rqst.getReplSrcTxnIds(), dbConn);
 
         if (!targetTxnIdList.isEmpty()) {
           if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) {
@@ -603,16 +605,20 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       List<Long> txnIds = new ArrayList<>(numTxns);
 
       List<String> rows = new ArrayList<>();
+      List<String> params = new ArrayList<>();
+      params.add(rqst.getUser());
+      params.add(rqst.getHostname());
+      List<List<String>> paramsList = new ArrayList<>(numTxns);
       for (long i = first; i < first + numTxns; i++) {
         txnIds.add(i);
-        rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ","
-                + quoteString(rqst.getUser()) + "," + 
quoteString(rqst.getHostname()) + "," + txnType.getValue());
+        rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + 
",?,?," + txnType.getValue());
+        paramsList.add(params);
       }
-      List<String> queries = sqlGenerator.createInsertValuesStmt(
-            "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, 
txn_user, txn_host, txn_type)", rows);
-      for (String q : queries) {
-        LOG.debug("Going to execute update <" + q + ">");
-        stmt.execute(q);
+      insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+            "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, 
txn_user, txn_host, txn_type)",
+              rows, paramsList);
+      for (PreparedStatement pst : insertPreparedStmts) {
+        pst.execute();
       }
 
       // Need to register minimum open txnid for current transactions into 
MIN_HISTORY table.
@@ -644,18 +650,23 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
 
       if (rqst.isSetReplPolicy()) {
         List<String> rowsRepl = new ArrayList<>();
-
+        for (PreparedStatement pst : insertPreparedStmts) {
+          pst.close();
+        }
+        insertPreparedStmts.clear();
+        params.clear();
+        paramsList.clear();
+        params.add(rqst.getReplPolicy());
         for (int i = 0; i < numTxns; i++) {
-          rowsRepl.add(
-                  quoteString(rqst.getReplPolicy()) + "," + 
rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
+          rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + 
txnIds.get(i));
+          paramsList.add(params);
         }
 
-        List<String> queriesRepl = sqlGenerator.createInsertValuesStmt(
-                "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, 
RTM_TARGET_TXN_ID)", rowsRepl);
-
-        for (String query : queriesRepl) {
-          LOG.info("Going to execute insert <" + query + ">");
-          stmt.execute(query);
+        insertPreparedStmts = 
sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+                "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, 
RTM_TARGET_TXN_ID)", rowsRepl,
+                paramsList);
+        for (PreparedStatement pst : insertPreparedStmts) {
+          pst.execute();
         }
       }
 
@@ -665,12 +676,18 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       }
       return txnIds;
     } finally {
+      if (insertPreparedStmts != null) {
+        for (PreparedStatement pst : insertPreparedStmts) {
+          pst.close();
+        }
+      }
       close(rs);
     }
   }
 
-  private List<Long> getTargetTxnIdList(String replPolicy, List<Long> 
sourceTxnIdList, Statement stmt)
+  private List<Long> getTargetTxnIdList(String replPolicy, List<Long> 
sourceTxnIdList, Connection dbConn)
           throws SQLException {
+    PreparedStatement pst = null;
     ResultSet rs = null;
     try {
       List<String> inQueries = new ArrayList<>();
@@ -678,15 +695,18 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       StringBuilder suffix = new StringBuilder();
       List<Long> targetTxnIdList = new ArrayList<>();
       prefix.append("select RTM_TARGET_TXN_ID from REPL_TXN_MAP where ");
-      suffix.append(" and RTM_REPL_POLICY = " + quoteString(replPolicy));
+      suffix.append(" and RTM_REPL_POLICY = ?");
       TxnUtils.buildQueryWithINClause(conf, inQueries, prefix, suffix, 
sourceTxnIdList,
               "RTM_SRC_TXN_ID", false, false);
+      List<String> params = Arrays.asList(replPolicy);
       for (String query : inQueries) {
-        LOG.debug("Going to execute select <" + query + ">");
-        rs = stmt.executeQuery(query);
+        LOG.debug("Going to execute select <" + query.replaceAll("\\?", "{}") 
+ ">", quoteString(replPolicy));
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
+        rs = pst.executeQuery();
         while (rs.next()) {
           targetTxnIdList.add(rs.getLong(1));
         }
+        closeStmt(pst);
       }
       LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " 
is " + targetTxnIdList.toString());
       return targetTxnIdList;
@@ -694,6 +714,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       LOG.warn("failed to get target txn ids " + e.getMessage());
       throw e;
     } finally {
+      closeStmt(pst);
       close(rs);
     }
   }
@@ -703,12 +724,10 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   public long getTargetTxnId(String replPolicy, long sourceTxnId) throws 
MetaException {
     try {
       Connection dbConn = null;
-      Statement stmt = null;
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, 
Collections.singletonList(sourceTxnId), stmt);
+        List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, 
Collections.singletonList(sourceTxnId), dbConn);
         if (targetTxnIds.isEmpty()) {
           LOG.info("Txn {} not present for repl policy {}", sourceTxnId, 
replPolicy);
           return -1;
@@ -722,7 +741,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         throw new MetaException("Unable to get target transaction id "
                 + StringUtils.stringifyException(e));
       } finally {
-        close(null, stmt, dbConn);
+        closeDbConn(dbConn);
         unlockInternal();
       }
     } catch (RetryException e) {
@@ -730,6 +749,14 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     }
   }
 
+  private void deleteReplTxnMapEntry(Connection dbConn, long sourceTxnId, 
String replPolicy) throws SQLException {
+    String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + 
sourceTxnId + " and RTM_REPL_POLICY = ?";
+    try (PreparedStatement pst = 
sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(replPolicy))) {
+      LOG.info("Going to execute  <" + s.replaceAll("\\?", "{}") + ">", 
quoteString(replPolicy));
+      pst.executeUpdate();
+    }
+  }
+
   @Override
   @RetrySemantics.Idempotent
   public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, 
MetaException, TxnAbortedException {
@@ -746,7 +773,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         if (rqst.isSetReplPolicy()) {
           sourceTxnId = rqst.getTxnid();
           List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
-                  Collections.singletonList(sourceTxnId), stmt);
+                  Collections.singletonList(sourceTxnId), dbConn);
           if (targetTxnIds.isEmpty()) {
             // Idempotent case where txn was already closed or abort txn event 
received without
             // corresponding open txn event.
@@ -764,10 +791,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
             if (rqst.isSetReplPolicy()) {
               // in case of replication, idempotent is taken care by 
getTargetTxnId
               LOG.warn("Invalid state ABORTED for transactions started using 
replication replay task");
-              String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + 
sourceTxnId +
-                      " and RTM_REPL_POLICY = " + 
quoteString(rqst.getReplPolicy());
-              LOG.info("Going to execute  <" + s + ">");
-              stmt.executeUpdate(s);
+              deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
             }
             LOG.info("abortTxn(" + JavaUtils.txnIdToString(txnid) +
               ") requested by it is already " + TxnStatus.ABORTED);
@@ -777,10 +801,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         }
 
         if (rqst.isSetReplPolicy()) {
-          String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + 
sourceTxnId +
-              " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
-          LOG.info("Going to execute  <" + s + ">");
-          stmt.executeUpdate(s);
+          deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
         }
 
         if (transactionalListeners != null) {
@@ -879,6 +900,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      List<PreparedStatement> insertPreparedStmts = null;
       ResultSet lockHandle = null;
       ResultSet commitIdRs = null, rs;
       try {
@@ -889,7 +911,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         if (rqst.isSetReplPolicy()) {
           sourceTxnId = rqst.getTxnid();
           List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
-                  Collections.singletonList(sourceTxnId), stmt);
+                  Collections.singletonList(sourceTxnId), dbConn);
           if (targetTxnIds.isEmpty()) {
             // Idempotent case where txn was already closed or commit txn 
event received without
             // corresponding open txn event.
@@ -1048,25 +1070,26 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         } else {
           if (rqst.isSetWriteEventInfos()) {
             List<String> rows = new ArrayList<>();
+            List<List<String>> paramsList = new ArrayList<>();
             for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
-              rows.add(txnid + "," + quoteString(writeEventInfo.getDatabase()) 
+ "," +
-                      quoteString(writeEventInfo.getTable()) + "," +
-                      quoteString(writeEventInfo.getPartition()) + "," +
+              rows.add(txnid + ", ?, ?, ?," +
                       writeEventInfo.getWriteId() + "," +
-                      "'" + isUpdateDelete + "'");
+                      quoteChar(isUpdateDelete));
+              List<String> params = new ArrayList<>();
+              params.add(writeEventInfo.getDatabase());
+              params.add(writeEventInfo.getTable());
+              params.add(writeEventInfo.getPartition());
+              paramsList.add(params);
             }
-            List<String> queries = 
sqlGenerator.createInsertValuesStmt("COMPLETED_TXN_COMPONENTS " +
-                    "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, 
ctc_writeid, ctc_update_delete)", rows);
-            for (String q : queries) {
-              LOG.debug("Going to execute insert  <" + q + "> ");
-              stmt.execute(q);
+            insertPreparedStmts = 
sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+                    "COMPLETED_TXN_COMPONENTS " +
+                   "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, 
ctc_writeid, ctc_update_delete)",
+                    rows, paramsList);
+            for (PreparedStatement pst : insertPreparedStmts) {
+              pst.execute();
             }
           }
-
-          s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId 
+
-                  " and RTM_REPL_POLICY = " + 
quoteString(rqst.getReplPolicy());
-          LOG.info("Repl going to execute  <" + s + ">");
-          stmt.executeUpdate(s);
+          deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
         }
 
         // cleanup all txn related metadata
@@ -1103,6 +1126,11 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
+        if (insertPreparedStmts != null) {
+          for (PreparedStatement pst : insertPreparedStmts) {
+            closeStmt(pst);
+          }
+        }
         close(commitIdRs);
         close(lockHandle, stmt, dbConn);
         unlockInternal();
@@ -1130,8 +1158,11 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      PreparedStatement pStmt = null;
+      List<PreparedStatement> insertPreparedStmts = null;
       ResultSet rs = null;
       TxnStore.MutexAPI.LockHandle handle = null;
+      List<String> params = Arrays.asList(dbName, tblName);
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -1139,11 +1170,11 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
 
         // Check if this txn state is already replicated for this given table. 
If yes, then it is
         // idempotent case and just return.
-        String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = 
" + quoteString(dbName)
-                        + " and nwi_table = " + quoteString(tblName);
-        LOG.debug("Going to execute query <" + sql + ">");
-
-        rs = stmt.executeQuery(sql);
+        String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = 
? and nwi_table = ?";
+        pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, sql, params);
+        LOG.debug("Going to execute query <" + sql.replaceAll("\\?", "{}") + 
">",
+                quoteString(dbName), quoteString(tblName));
+        rs = pStmt.executeQuery();
         if (rs.next()) {
           LOG.info("Idempotent flow: WriteId state <" + validWriteIdList + "> 
is already applied for the table: "
                   + dbName + "." + tblName);
@@ -1159,19 +1190,21 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
 
           // Map each aborted write id with each allocated txn.
           List<String> rows = new ArrayList<>();
+          List<List<String>> paramsList = new ArrayList<>();
           int i = 0;
           for (long txn : txnIds) {
             long writeId = abortedWriteIds.get(i++);
-            rows.add(txn + ", " + quoteString(dbName) + ", " + 
quoteString(tblName) + ", " + writeId);
+            rows.add(txn + ", ?, ?, " + writeId);
+            paramsList.add(params);
             LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
           }
 
           // Insert entries to TXN_TO_WRITE_ID for aborted write ids
-          List<String> inserts = sqlGenerator.createInsertValuesStmt(
-                  "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, 
t2w_writeid)", rows);
-          for (String insert : inserts) {
-            LOG.debug("Going to execute insert <" + insert + ">");
-            stmt.execute(insert);
+          insertPreparedStmts = 
sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+                  "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, 
t2w_writeid)", rows,
+                  paramsList);
+          for (PreparedStatement pst : insertPreparedStmts) {
+            pst.execute();
           }
 
           // Abort all the allocated txns so that the mapped write ids are 
referred as aborted ones.
@@ -1186,11 +1219,13 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         long nextWriteId = validWriteIdList.getHighWatermark() + 1;
 
         // First allocation of write id (hwm+1) should add the table to the 
next_write_id meta table.
-        sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) 
values ("
-                + quoteString(dbName) + "," + quoteString(tblName) + ","
+        sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) 
values (?, ?, "
                 + Long.toString(nextWriteId) + ")";
-        LOG.debug("Going to execute insert <" + sql + ">");
-        stmt.execute(sql);
+        closeStmt(pStmt);
+        pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, sql, params);
+        LOG.debug("Going to execute insert <" + sql.replaceAll("\\?", "{}") + 
">",
+                quoteString(dbName), quoteString(tblName));
+        pStmt.execute();
 
         LOG.info("WriteId state <" + validWriteIdList + "> is applied for the 
table: " + dbName + "." + tblName);
         LOG.debug("Going to commit");
@@ -1202,6 +1237,12 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
                 + StringUtils.stringifyException(e));
       } finally {
+        if (insertPreparedStmts != null) {
+          for (PreparedStatement pst : insertPreparedStmts) {
+            closeStmt(pst);
+          }
+        }
+        closeStmt(pStmt);
         close(rs, stmt, dbConn);
         if(handle != null) {
           handle.releaseLocks();
@@ -1244,13 +1285,11 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     try {
       String[] names = TxnUtils.getDbTableName(fullTableName);
       assert names.length == 2;
-      String s = "select t2w_txnid from TXN_TO_WRITE_ID where  t2w_database = 
? and t2w_table = ? and t2w_writeid = ?";
-      pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s));
-      pst.setString(1, names[0]);
-      pst.setString(2, names[1]);
-      pst.setLong(3, writeId);
+      List<String> params = Arrays.asList(names[0], names[1]);
+      String s = "select t2w_txnid from TXN_TO_WRITE_ID where  t2w_database = 
? and t2w_table = ? and t2w_writeid = " + writeId;
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
       LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", 
quoteString(names[0]),
-              quoteString(names[1]), writeId);
+              quoteString(names[1]));
       rs = pst.executeQuery();
       if (rs.next()) {
         return TxnCommonUtils.createValidReadTxnList(getOpenTxns(), 
rs.getLong(1));
@@ -1266,7 +1305,6 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest 
rqst) throws MetaException {
     try {
       Connection dbConn = null;
-      Statement stmt = null;
       ValidTxnList validTxnList;
 
       try {
@@ -1274,7 +1312,6 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
          * This runs at READ_COMMITTED for exactly the same reason as {@link 
#getOpenTxnsInfo()}
          */
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
 
         // We should prepare the valid write ids list based on validTxnList of 
current txn.
         // If no txn exists in the caller, then they would pass null for 
validTxnList and so it is
@@ -1292,7 +1329,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         // Get the valid write id list for all the tables read by the current 
txn
         List<TableValidWriteIds> tblValidWriteIdsList = new ArrayList<>();
         for (String fullTableName : rqst.getFullTableNames()) {
-          tblValidWriteIdsList.add(getValidWriteIdsForTable(stmt, 
fullTableName, validTxnList));
+          tblValidWriteIdsList.add(getValidWriteIdsForTable(dbConn, 
fullTableName, validTxnList));
         }
 
         LOG.debug("Going to rollback");
@@ -1306,7 +1343,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         throw new MetaException("Unable to select from transaction database, "
                 + StringUtils.stringifyException(e));
       } finally {
-        close(null, stmt, dbConn);
+        closeDbConn(dbConn);
       }
     } catch (RetryException e) {
       return getValidWriteIds(rqst);
@@ -1315,10 +1352,13 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
 
   // Method to get the Valid write ids list for the given table
   // Input fullTableName is expected to be of format <db_name>.<table_name>
-  private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String 
fullTableName,
+  private TableValidWriteIds getValidWriteIdsForTable(Connection dbConn, 
String fullTableName,
                                                ValidTxnList validTxnList) 
throws SQLException {
+    PreparedStatement pst = null;
     ResultSet rs = null;
     String[] names = TxnUtils.getDbTableName(fullTableName);
+    assert(names.length == 2);
+    List<String> params = Arrays.asList(names[0], names[1]);
     try {
       // Need to initialize to 0 to make sure if nobody modified this table, 
then current txn
       // shouldn't read any data.
@@ -1333,11 +1373,12 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       // Find the writeId high water mark based upon txnId high water mark. If 
found, then, need to
       // traverse through all write Ids less than writeId HWM to make 
exceptions list.
       // The writeHWM = min(NEXT_WRITE_ID.nwi_next-1, 
max(TXN_TO_WRITE_ID.t2w_writeid under txnHwm))
-      String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid 
<= " + txnHwm
-              + " and t2w_database = " + quoteString(names[0])
-              + " and t2w_table = " + quoteString(names[1]);
-      LOG.debug("Going to execute query<" + s + ">");
-      rs = stmt.executeQuery(s);
+      String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid 
<= " + Long.toString(txnHwm)
+              + " and t2w_database = ? and t2w_table = ?";
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+      LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">",
+              quoteString(names[0]), quoteString(names[1]));
+      rs = pst.executeQuery();
       if (rs.next()) {
         writeIdHwm = rs.getLong(1);
       }
@@ -1346,10 +1387,12 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       if (writeIdHwm <= 0) {
         // Need to subtract 1 as nwi_next would be the next write id to be 
allocated but we need highest
         // allocated write id.
-        s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = " + 
quoteString(names[0])
-                + " and nwi_table = " + quoteString(names[1]);
-        LOG.debug("Going to execute query<" + s + ">");
-        rs = stmt.executeQuery(s);
+        s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = ? and 
nwi_table = ?";
+        closeStmt(pst);
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+        LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">",
+                quoteString(names[0]), quoteString(names[1]));
+        rs = pst.executeQuery();
         if (rs.next()) {
           long maxWriteId = rs.getLong(1);
           if (maxWriteId > 0) {
@@ -1363,13 +1406,13 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       // then will be added to invalid list. The results should be sorted in 
ascending order based
       // on write id. The sorting is needed as exceptions list in 
ValidWriteIdList would be looked-up
       // using binary search.
-      s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where 
t2w_writeid <= " + writeIdHwm
-              + " and t2w_database = " + quoteString(names[0])
-              + " and t2w_table = " + quoteString(names[1])
-              + " order by t2w_writeid asc";
-
-      LOG.debug("Going to execute query<" + s + ">");
-      rs = stmt.executeQuery(s);
+      s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where 
t2w_writeid <= " + Long.toString(writeIdHwm)
+              + " and t2w_database = ? and t2w_table = ? order by t2w_writeid 
asc";
+      closeStmt(pst);
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+      LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">",
+              quoteString(names[0]), quoteString(names[1]));
+      rs = pst.executeQuery();
       while (rs.next()) {
         long txnId = rs.getLong(1);
         long writeId = rs.getLong(2);
@@ -1395,6 +1438,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       }
       return owi;
     } finally {
+      closeStmt(pst);
       close(rs);
     }
   }
@@ -1409,6 +1453,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      PreparedStatement pStmt = null;
+      List<PreparedStatement> insertPreparedStmts = null;
       ResultSet rs = null;
       TxnStore.MutexAPI.LockHandle handle = null;
       List<TxnToWriteId> txnToWriteIds = new ArrayList<>();
@@ -1428,7 +1474,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           for (TxnToWriteId txnToWriteId :  srcTxnToWriteIds) {
             srcTxnIds.add(txnToWriteId.getTxnId());
           }
-          txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, stmt);
+          txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, dbConn);
           if (srcTxnIds.size() != txnIds.size()) {
             // Idempotent case where txn was already closed but gets allocate 
write id event.
             // So, just ignore it and return empty list.
@@ -1459,8 +1505,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         // The write id would have been already allocated in case of 
multi-statement txns where
         // first write on a table will allocate write id and rest of the 
writes should re-use it.
         prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID 
where"
-                        + " t2w_database = " + quoteString(dbName)
-                        + " and t2w_table = " + quoteString(tblName) + " and 
");
+                        + " t2w_database = ? and t2w_table = ?" + " and ");
         suffix.append("");
         TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
                 txnIds, "t2w_txnid", false, false);
@@ -1468,9 +1513,12 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         long allocatedTxnsCount = 0;
         long txnId;
         long writeId = 0;
+        List<String> params = Arrays.asList(dbName, tblName);
         for (String query : queries) {
-          LOG.debug("Going to execute query <" + query + ">");
-          rs = stmt.executeQuery(query);
+          pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, 
params);
+          LOG.debug("Going to execute query <" + query.replaceAll("\\?", "{}") 
+ ">",
+                  quoteString(dbName), quoteString(tblName));
+          rs = pStmt.executeQuery();
           while (rs.next()) {
             // If table write ID is already allocated for the given 
transaction, then just use it
             txnId = rs.getLong(1);
@@ -1479,6 +1527,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
             allocatedTxnsCount++;
             LOG.info("Reused already allocated writeID: " + writeId + " for 
txnId: " + txnId);
           }
+          closeStmt(pStmt);
         }
 
         // Batch allocation should always happen atomically. Either write ids 
for all txns is allocated or none.
@@ -1503,58 +1552,69 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         // Get the next write id for the given table and update it with new 
next write id.
         // This is select for update query which takes a lock if the table 
entry is already there in NEXT_WRITE_ID
         String s = sqlGenerator.addForUpdateClause(
-                "select nwi_next from NEXT_WRITE_ID where nwi_database = " + 
quoteString(dbName)
-                        + " and nwi_table = " + quoteString(tblName));
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
+                "select nwi_next from NEXT_WRITE_ID where nwi_database = ? and 
nwi_table = ?");
+        closeStmt(pStmt);
+        pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+        LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
+                quoteString(dbName), quoteString(tblName));
+        rs = pStmt.executeQuery();
         if (!rs.next()) {
           // First allocation of write id should add the table to the 
next_write_id meta table
           // The initial value for write id should be 1 and hence we add 1 
with number of write ids allocated here
           // For repl flow, we need to force set the incoming write id.
           writeId = (srcWriteId > 0) ? srcWriteId : 1;
-          s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) 
values ("
-                  + quoteString(dbName) + "," + quoteString(tblName) + "," + 
(writeId + numOfWriteIds) + ")";
-          LOG.debug("Going to execute insert <" + s + ">");
-          stmt.execute(s);
+          s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) 
values (?, ?, "
+                  + Long.toString(writeId + numOfWriteIds) + ")";
+          closeStmt(pStmt);
+          pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+          LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + 
">",
+                  quoteString(dbName), quoteString(tblName));
+          pStmt.execute();
         } else {
           long nextWriteId = rs.getLong(1);
           writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId;
 
           // Update the NEXT_WRITE_ID for the given table after incrementing 
by number of write ids allocated
-          s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + 
numOfWriteIds)
-                  + " where nwi_database = " + quoteString(dbName)
-                  + " and nwi_table = " + quoteString(tblName);
-          LOG.debug("Going to execute update <" + s + ">");
-          stmt.executeUpdate(s);
+          s = "update NEXT_WRITE_ID set nwi_next = " + Long.toString(writeId + 
numOfWriteIds)
+                  + " where nwi_database = ? and nwi_table = ?";
+          closeStmt(pStmt);
+          pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+          LOG.debug("Going to execute update <" + s.replaceAll("\\?", "{}") + 
">",
+                  quoteString(dbName), quoteString(tblName));
+          pStmt.executeUpdate();
 
           // For repl flow, if the source write id is mismatching with target 
next write id, then current
           // metadata in TXN_TO_WRITE_ID is stale for this table and hence 
need to clean-up TXN_TO_WRITE_ID.
           // This is possible in case of first incremental repl after 
bootstrap where concurrent write
           // and drop table was performed at source during bootstrap dump.
           if ((srcWriteId > 0) && (srcWriteId != nextWriteId)) {
-            s = "delete from TXN_TO_WRITE_ID where t2w_database = " + 
quoteString(dbName)
-                    + " and t2w_table = " + quoteString(tblName);
-            LOG.debug("Going to execute delete <" + s + ">");
-            stmt.executeUpdate(s);
+            s = "delete from TXN_TO_WRITE_ID where t2w_database = ? and 
t2w_table = ?";
+            closeStmt(pStmt);
+            pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+            LOG.debug("Going to execute delete <" + s.replaceAll("\\?", "{}") 
+ ">",
+                    quoteString(dbName), quoteString(tblName));
+            pStmt.executeUpdate();
           }
         }
 
         // Map the newly allocated write ids against the list of txns which 
doesn't have pre-allocated
         // write ids
         List<String> rows = new ArrayList<>();
+        List<List<String>> paramsList = new ArrayList<>();
         for (long txn : txnIds) {
-          rows.add(txn + ", " + quoteString(dbName) + ", " + 
quoteString(tblName) + ", " + writeId);
+          rows.add(txn + ", ?, ?, " + writeId);
           txnToWriteIds.add(new TxnToWriteId(txn, writeId));
+          paramsList.add(params);
           LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
           writeId++;
         }
 
         // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids
-        List<String> inserts = sqlGenerator.createInsertValuesStmt(
-                "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, 
t2w_writeid)", rows);
-        for (String insert : inserts) {
-          LOG.debug("Going to execute insert <" + insert + ">");
-          stmt.execute(insert);
+        insertPreparedStmts = 
sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+                "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, 
t2w_writeid)", rows,
+                paramsList);
+        for (PreparedStatement pst : insertPreparedStmts) {
+          pst.execute();
         }
 
         if (transactionalListeners != null) {
@@ -1574,6 +1634,12 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
                 + StringUtils.stringifyException(e));
       } finally {
+        if (insertPreparedStmts != null) {
+          for (PreparedStatement pst : insertPreparedStmts) {
+            closeStmt(pst);
+          }
+        }
+        closeStmt(pStmt);
         close(rs, stmt, dbConn);
         if(handle != null) {
           handle.releaseLocks();
@@ -1589,12 +1655,11 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       throws MetaException {
     try {
       Connection dbConn = null;
-      Statement stmt = null;
+      PreparedStatement pst = null;
       TxnStore.MutexAPI.LockHandle handle = null;
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
 
         handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
         //since this is on conversion from non-acid to acid, NEXT_WRITE_ID 
should not have an entry
@@ -1603,11 +1668,12 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         // First allocation of write id should add the table to the 
next_write_id meta table
         // The initial value for write id should be 1 and hence we add 1 with 
number of write ids
         // allocated here
-        String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, 
nwi_next) values ("
-            + quoteString(rqst.getDbName()) + "," + 
quoteString(rqst.getTblName()) + "," +
-            Long.toString(rqst.getSeeWriteId() + 1) + ")";
-        LOG.debug("Going to execute insert <" + s + ">");
-        stmt.execute(s);
+        String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, 
nwi_next) values (?, ?, "
+                + Long.toString(rqst.getSeeWriteId() + 1) + ")";
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, 
Arrays.asList(rqst.getDbName(), rqst.getTblName()));
+        LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + 
">",
+                quoteString(rqst.getDbName()), quoteString(rqst.getTblName()));
+        pst.execute();
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {
@@ -1617,7 +1683,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
             + StringUtils.stringifyException(e));
       } finally {
-        close(null, stmt, dbConn);
+        close(null, pst, dbConn);
         if(handle != null) {
           handle.releaseLocks();
         }
@@ -1737,12 +1803,11 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     // We are composing a query that returns a single row if an update 
happened after
     // the materialization was created. Otherwise, query returns 0 rows.
     Connection dbConn = null;
-    Statement stmt = null;
+    PreparedStatement pst = null;
     ResultSet rs = null;
     try {
       dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      stmt = dbConn.createStatement();
-      stmt.setMaxRows(1);
+      List<String> params = new ArrayList<>();
       StringBuilder query = new StringBuilder();
       // compose a query that select transactions containing an update...
       query.append("select ctc_update_delete from COMPLETED_TXN_COMPONENTS 
where ctc_update_delete='Y' AND (");
@@ -1754,7 +1819,10 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           query.append("OR");
         }
         String[] names = TxnUtils.getDbTableName(fullyQualifiedName);
-        query.append(" (ctc_database=" + quoteString(names[0]) + " AND 
ctc_table=" + quoteString(names[1]));
+        assert(names.length == 2);
+        query.append(" (ctc_database=? AND ctc_table=?");
+        params.add(names[0]);
+        params.add(names[1]);
         ValidWriteIdList tblValidWriteIdList =
             
validReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName);
         if (tblValidWriteIdList == null) {
@@ -1780,7 +1848,9 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Going to execute query <" + s + ">");
       }
-      rs = stmt.executeQuery(s);
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+      pst.setMaxRows(1);
+      rs = pst.executeQuery();
 
       return new Materialization(rs.next());
     } catch (SQLException ex) {
@@ -1788,7 +1858,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       throw new MetaException("Unable to retrieve materialization invalidation 
information due to " +
           StringUtils.stringifyException(ex));
     } finally {
-      close(rs, stmt, dbConn);
+      close(rs, pst, dbConn);
     }
   }
 
@@ -1802,7 +1872,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
 
     TxnStore.MutexAPI.LockHandle handle = null;
     Connection dbConn = null;
-    Statement stmt = null;
+    PreparedStatement pst = null;
     ResultSet rs = null;
     try {
       lockInternal();
@@ -1813,13 +1883,14 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
        */
       handle = 
getMutexAPI().acquireLock(MUTEX_KEY.MaterializationRebuild.name());
       dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      stmt = dbConn.createStatement();
 
+      List<String> params = Arrays.asList(dbName, tableName);
       String selectQ = "select mrl_txn_id from MATERIALIZATION_REBUILD_LOCKS 
where" +
-          " mrl_db_name =" + quoteString(dbName) +
-          " AND mrl_tbl_name=" + quoteString(tableName);
-      LOG.debug("Going to execute query <" + selectQ + ">");
-      rs = stmt.executeQuery(selectQ);
+          " mrl_db_name = ? AND mrl_tbl_name = ?";
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, selectQ, params);
+      LOG.debug("Going to execute query <" + selectQ.replaceAll("\\?", "{}") + 
">",
+              quoteString(dbName), quoteString(tableName));
+      rs = pst.executeQuery();
       if(rs.next()) {
         LOG.info("Ignoring request to rebuild " + dbName + "/" + tableName +
             " since it is already being rebuilt");
@@ -1827,9 +1898,12 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       }
       String insertQ = "insert into MATERIALIZATION_REBUILD_LOCKS " +
           "(mrl_txn_id, mrl_db_name, mrl_tbl_name, mrl_last_heartbeat) values 
(" + txnId +
-          ", '" + dbName + "', '" + tableName + "', " + 
Instant.now().toEpochMilli() + ")";
-      LOG.debug("Going to execute update <" + insertQ + ">");
-      stmt.executeUpdate(insertQ);
+          ", ?, ?, " + Instant.now().toEpochMilli() + ")";
+      closeStmt(pst);
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, insertQ, params);
+      LOG.debug("Going to execute update <" + insertQ.replaceAll("\\?", "{}") 
+ ">",
+              quoteString(dbName), quoteString(tableName));
+      pst.executeUpdate();
       LOG.debug("Going to commit");
       dbConn.commit();
       return new LockResponse(txnId, LockState.ACQUIRED);
@@ -1838,7 +1912,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       throw new MetaException("Unable to retrieve materialization invalidation 
information due to " +
           StringUtils.stringifyException(ex));
     } finally {
-      close(rs, stmt, dbConn);
+      close(rs, pst, dbConn);
       if(handle != null) {
         handle.releaseLocks();
       }
@@ -1851,19 +1925,19 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       throws MetaException {
     try {
       Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
+      PreparedStatement pst = null;
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
         String s = "update MATERIALIZATION_REBUILD_LOCKS" +
             " set mrl_last_heartbeat = " + Instant.now().toEpochMilli() +
             " where mrl_txn_id = " + txnId +
-            " AND mrl_db_name =" + quoteString(dbName) +
-            " AND mrl_tbl_name=" + quoteString(tableName);
-        LOG.debug("Going to execute update <" + s + ">");
-        int rc = stmt.executeUpdate(s);
+            " AND mrl_db_name = ?" +
+            " AND mrl_tbl_name = ?";
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, 
Arrays.asList(dbName, tableName));
+        LOG.debug("Going to execute update <" + s.replaceAll("\\?", "{}") + 
">",
+                quoteString(dbName), quoteString(tableName));
+        int rc = pst.executeUpdate();
         if (rc < 1) {
           LOG.debug("Going to rollback");
           dbConn.rollback();
@@ -1884,7 +1958,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         throw new MetaException("Unable to heartbeat rebuild lock due to " +
             StringUtils.stringifyException(e));
       } finally {
-        close(rs, stmt, dbConn);
+        close(null, pst, dbConn);
         unlockInternal();
       }
     } catch (RetryException e) {
@@ -2019,6 +2093,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     Connection dbConn = null;
     try {
       Statement stmt = null;
+      PreparedStatement pStmt = null;
+      List<PreparedStatement> insertPreparedStmts = null;
       ResultSet rs = null;
       ResultSet lockHandle = null;
       try {
@@ -2056,6 +2132,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
 
         if (txnid > 0) {
           List<String> rows = new ArrayList<>();
+          List<List<String>> paramsList = new ArrayList<>();
           // For each component in this lock request,
           // add an entry to the txn_components table
           for (LockComponent lc : rqst.getComponent()) {
@@ -2115,30 +2192,41 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
               // may return empty result sets.
               // Get the write id allocated by this txn for the given table 
writes
               s = "select t2w_writeid from TXN_TO_WRITE_ID where"
-                      + " t2w_database = " + quoteString(dbName)
-                      + " and t2w_table = " + quoteString(tblName)
-                      + " and t2w_txnid = " + txnid;
-              LOG.debug("Going to execute query <" + s + ">");
-              rs = stmt.executeQuery(s);
+                      + " t2w_database = ? and t2w_table = ? and t2w_txnid = " 
+ txnid;
+              pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, 
Arrays.asList(dbName, tblName));
+              LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") 
+ ">",
+                      quoteString(dbName), quoteString(tblName));
+              rs = pStmt.executeQuery();
               if (rs.next()) {
                 writeId = rs.getLong(1);
               }
             }
-            rows.add(txnid + ", '" + dbName + "', " +
-                    (tblName == null ? "null" : "'" + tblName + "'") + ", " +
-                    (partName == null ? "null" : "'" + partName + "'")+ "," +
+            rows.add(txnid + ", ?, " +
+                    (tblName == null ? "null" : "?") + ", " +
+                    (partName == null ? "null" : "?")+ "," +
                     
quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString())+
 "," +
                     (writeId == null ? "null" : writeId));
+            List<String> params = new ArrayList<>();
+            params.add(dbName);
+            if (tblName != null) {
+              params.add(tblName);
+            }
+            if (partName != null) {
+              params.add(partName);
+            }
+            paramsList.add(params);
           }
-          List<String> queries = sqlGenerator.createInsertValuesStmt(
-              "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, 
tc_operation_type, tc_writeid)", rows);
-          for(String query : queries) {
-            LOG.debug("Going to execute update <" + query + ">");
-            int modCount = stmt.executeUpdate(query);
+          insertPreparedStmts = 
sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+              "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, 
tc_operation_type, tc_writeid)",
+                  rows, paramsList);
+          for(PreparedStatement pst : insertPreparedStmts) {
+            int modCount = pst.executeUpdate();
+            closeStmt(pst);
           }
+          insertPreparedStmts = null;
         }
-
         List<String> rows = new ArrayList<>();
+        List<List<String>> paramsList = new ArrayList<>();
         long intLockId = 0;
         for (LockComponent lc : rqst.getComponent()) {
           if(lc.isSetOperationType() && lc.getOperationType() == 
DataOperationType.UNSET &&
@@ -2170,24 +2258,40 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
               break;
           }
           long now = getDbTime(dbConn);
-            rows.add(extLockId + ", " + intLockId + "," + txnid + ", " +
-            quoteString(dbName) + ", " +
-            valueOrNullLiteral(tblName) + ", " +
-            valueOrNullLiteral(partName) + ", " +
+          rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " +
+                  ((tblName == null) ? "null" : "?") + ", " +
+                  ((partName == null) ? "null" : "?") + ", " +
             quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " +
             //for locks associated with a txn, we always heartbeat txn and 
timeout based on that
             (isValidTxn(txnid) ? 0 : now) + ", " +
-            valueOrNullLiteral(rqst.getUser()) + ", " +
-            valueOrNullLiteral(rqst.getHostname()) + ", " +
-            valueOrNullLiteral(rqst.getAgentInfo()));// + ")";
+                  ((rqst.getUser() == null) ? "null" : "?")  + ", " +
+                  ((rqst.getHostname() == null) ? "null" : "?") + ", " +
+                  ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")";
+          List<String> params = new ArrayList<>();
+          params.add(dbName);
+          if (tblName != null) {
+            params.add(tblName);
+          }
+          if (partName != null) {
+            params.add(partName);
+          }
+          if (rqst.getUser() != null) {
+            params.add(rqst.getUser());
+          }
+          if (rqst.getHostname() != null) {
+            params.add(rqst.getHostname());
+          }
+          if (rqst.getAgentInfo() != null) {
+            params.add(rqst.getAgentInfo());
+          }
+          paramsList.add(params);
         }
-        List<String> queries = sqlGenerator.createInsertValuesStmt(
+        insertPreparedStmts = 
sqlGenerator.createInsertValuesPreparedStmt(dbConn,
           "HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, " +
             "hl_table, hl_partition,hl_lock_state, hl_lock_type, " +
-            "hl_last_heartbeat, hl_user, hl_host, hl_agent_info)", rows);
-        for(String query : queries) {
-          LOG.debug("Going to execute update <" + query + ">");
-          int modCount = stmt.executeUpdate(query);
+            "hl_last_heartbeat, hl_user, hl_host, hl_agent_info)", rows, 
paramsList);
+        for(PreparedStatement pst : insertPreparedStmts) {
+          int modCount = pst.executeUpdate();
         }
         dbConn.commit();
         success = true;
@@ -2199,7 +2303,13 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database " +
           StringUtils.stringifyException(e));
       } finally {
+        if (insertPreparedStmts != null) {
+          for (PreparedStatement pst : insertPreparedStmts) {
+            closeStmt(pst);
+          }
+        }
         close(lockHandle);
+        closeStmt(pStmt);
         close(rs, stmt, null);
         if (!success) {
           /* This needs to return a "live" connection to be used by operation 
that follows it.
@@ -2399,10 +2509,9 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       ShowLocksResponse rsp = new ShowLocksResponse();
       List<ShowLocksResponseElement> elems = new ArrayList<>();
       List<LockInfoExt> sortedList = new ArrayList<>();
-      Statement stmt = null;
+      PreparedStatement pst = null;
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
 
         String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, 
hl_partition, hl_lock_state, " +
           "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, 
hl_lock_int_id," +
@@ -2412,22 +2521,26 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         String dbName = rqst.getDbname();
         String tableName = rqst.getTablename();
         String partName = rqst.getPartname();
+        List<String> params = new ArrayList<>();
 
         StringBuilder filter = new StringBuilder();
         if (dbName != null && !dbName.isEmpty()) {
-          filter.append("hl_db=").append(quoteString(dbName));
+          filter.append("hl_db=?");
+          params.add(dbName);
         }
         if (tableName != null && !tableName.isEmpty()) {
           if (filter.length() > 0) {
             filter.append(" and ");
           }
-          filter.append("hl_table=").append(quoteString(tableName));
+          filter.append("hl_table=?");
+          params.add(tableName);
         }
         if (partName != null && !partName.isEmpty()) {
           if (filter.length() > 0) {
             filter.append(" and ");
           }
-          filter.append("hl_partition=").append(quoteString(partName));
+          filter.append("hl_partition=?");
+          params.add(partName);
         }
         String whereClause = filter.toString();
 
@@ -2435,8 +2548,9 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           s = s + " where " + whereClause;
         }
 
-        LOG.debug("Doing to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+        LOG.debug("Going to execute query <" + s + ">");
+        ResultSet rs = pst.executeQuery();
         while (rs.next()) {
           ShowLocksResponseElement e = new ShowLocksResponseElement();
           e.setLockid(rs.getLong(1));
@@ -2481,7 +2595,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         throw new MetaException("Unable to select from transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        closeStmt(stmt);
+        closeStmt(pst);
         closeDbConn(dbConn);
       }
       //this ensures that "SHOW LOCKS" prints the locks in the same order as 
they are examined
@@ -2615,20 +2729,19 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       String dbName, String tblName, long writeId) throws MetaException {
     try {
       Connection dbConn = null;
-      Statement stmt = null;
+      PreparedStatement pst = null;
       try {
         /**
          * This runs at READ_COMMITTED for exactly the same reason as {@link 
#getOpenTxnsInfo()}
          */
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
 
         String query = "select t2w_txnid from TXN_TO_WRITE_ID where"
-            + " t2w_database = " + quoteString(dbName)
-            + " and t2w_table = " + quoteString(tblName)
-            + " and t2w_writeid = " + writeId;
-        LOG.debug("Going to execute query <" + query + ">");
-        ResultSet rs  = stmt.executeQuery(query);
+            + " t2w_database = ? and t2w_table = ? and t2w_writeid = " + 
writeId;
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, query, 
Arrays.asList(dbName, tblName));
+        LOG.debug("Going to execute query <" + query.replaceAll("\\?", "{}") + 
">",
+                quoteString(dbName), quoteString(tblName));
+        ResultSet rs  = pst.executeQuery();
         long txnId = -1;
         if (rs.next()) {
           txnId = rs.getLong(1);
@@ -2642,7 +2755,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         throw new MetaException("Unable to select from transaction database, "
                 + StringUtils.stringifyException(e));
       } finally {
-        close(null, stmt, dbConn);
+        close(null, pst, dbConn);
       }
     } catch (RetryException e) {
       return getTxnIdForWriteId(dbName, tblName, writeId);
@@ -2656,6 +2769,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      PreparedStatement pst = null;
       TxnStore.MutexAPI.LockHandle handle = null;
       try {
         lockInternal();
@@ -2670,20 +2784,24 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
 
         long id = generateCompactionQueueId(stmt);
 
+        List<String> params = new ArrayList<>();
         StringBuilder sb = new StringBuilder("select cq_id, cq_state from 
COMPACTION_QUEUE where").
           append(" cq_state IN(").append(quoteChar(INITIATED_STATE)).
             append(",").append(quoteChar(WORKING_STATE)).
-          append(") AND cq_database=").append(quoteString(rqst.getDbname())).
-          append(" AND 
cq_table=").append(quoteString(rqst.getTablename())).append(" AND ");
+          append(") AND cq_database=?").
+          append(" AND cq_table=?").append(" AND ");
+        params.add(rqst.getDbname());
+        params.add(rqst.getTablename());
         if(rqst.getPartitionname() == null) {
           sb.append("cq_partition is null");
-        }
-        else {
-          
sb.append("cq_partition=").append(quoteString(rqst.getPartitionname()));
+        } else {
+          sb.append("cq_partition=?");
+          params.add(rqst.getPartitionname());
         }
 
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), 
params);
         LOG.debug("Going to execute query <" + sb.toString() + ">");
-        ResultSet rs = stmt.executeQuery(sb.toString());
+        ResultSet rs = pst.executeQuery();
         if(rs.next()) {
           long enqueuedId = rs.getLong(1);
           String state = compactorStateToResponse(rs.getString(2).charAt(0));
@@ -2693,6 +2811,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           return new CompactionResponse(enqueuedId, state, false);
         }
         close(rs);
+        closeStmt(pst);
+        params.clear();
         StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE 
(cq_id, cq_database, " +
           "cq_table, ");
         String partName = rqst.getPartitionname();
@@ -2704,14 +2824,16 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         if (rqst.getRunas() != null) buf.append(", cq_run_as");
         buf.append(") values (");
         buf.append(id);
-        buf.append(", '");
-        buf.append(rqst.getDbname());
-        buf.append("', '");
-        buf.append(rqst.getTablename());
-        buf.append("', '");
+        buf.append(", ?");
+        buf.append(", ?");
+        buf.append(", ");
+        params.add(rqst.getDbname());
+        params.add(rqst.getTablename());
         if (partName != null) {
-          buf.append(partName);
-          buf.append("', '");
+          buf.append("?, '");
+          params.add(partName);
+        } else {
+          buf.append("'");
         }
         buf.append(INITIATED_STATE);
         buf.append("', '");
@@ -2729,18 +2851,20 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
             dbConn.rollback();
             throw new MetaException("Unexpected compaction type " + 
rqst.getType().toString());
         }
+        buf.append("'");
         if (rqst.getProperties() != null) {
-          buf.append("', '");
-          buf.append(new StringableMap(rqst.getProperties()).toString());
+          buf.append(", ?");
+          params.add(new StringableMap(rqst.getProperties()).toString());
         }
         if (rqst.getRunas() != null) {
-          buf.append("', '");
-          buf.append(rqst.getRunas());
+          buf.append(", ?");
+          params.add(rqst.getRunas());
         }
-        buf.append("')");
+        buf.append(")");
         String s = buf.toString();
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
         LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
+        pst.executeUpdate();
         LOG.debug("Going to commit");
         dbConn.commit();
         return new CompactionResponse(id, INITIATED_RESPONSE, true);
@@ -2751,6 +2875,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         throw new MetaException("Unable to select from transaction database " +
           StringUtils.stringifyException(e));
       } finally {
+        closeStmt(pst);
         closeStmt(stmt);
         closeDbConn(dbConn);
         if(handle != null) {
@@ -2859,6 +2984,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     Connection dbConn = null;
     Statement stmt = null;
     ResultSet lockHandle = null;
+    List<PreparedStatement> insertPreparedStmts = null;
     try {
       try {
         lockInternal();
@@ -2878,18 +3004,22 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
 
         Long writeId = rqst.getWriteid();
         List<String> rows = new ArrayList<>();
+        List<List<String>> paramsList = new ArrayList<>();
         for (String partName : rqst.getPartitionnames()) {
-          rows.add(rqst.getTxnid() + "," + 
quoteString(normalizeCase(rqst.getDbname()))
-              + "," + quoteString(normalizeCase(rqst.getTablename())) +
-              "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + "," 
+ writeId);
+          rows.add(rqst.getTxnid() + ",?,?,?," + quoteChar(ot.sqlConst) + "," 
+ writeId);
+          List<String> params = new ArrayList<>();
+          params.add(normalizeCase(rqst.getDbname()));
+          params.add(normalizeCase(rqst.getTablename()));
+          params.add(partName);
+          paramsList.add(params);
         }
         int modCount = 0;
         //record partitions that were written to
-        List<String> queries = sqlGenerator.createInsertValuesStmt(
-            "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, 
tc_operation_type, tc_writeid)", rows);
-        for(String query : queries) {
-          LOG.debug("Going to execute update <" + query + ">");
-          modCount = stmt.executeUpdate(query);
+        insertPreparedStmts = 
sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+            "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, 
tc_operation_type, tc_writeid)",
+                rows, paramsList);
+        for(PreparedStatement pst : insertPreparedStmts) {
+          modCount = pst.executeUpdate();
         }
         LOG.debug("Going to commit");
         dbConn.commit();
@@ -2900,6 +3030,11 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         throw new MetaException("Unable to insert into from transaction 
database " +
           StringUtils.stringifyException(e));
       } finally {
+        if (insertPreparedStmts != null) {
+          for(PreparedStatement pst : insertPreparedStmts) {
+            closeStmt(pst);
+          }
+        }
         close(lockHandle, stmt, dbConn);
         unlockInternal();
       }

Reply via email to