veghlaci05 commented on code in PR #4566:
URL: https://github.com/apache/hive/pull/4566#discussion_r1425275159


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -648,260 +400,85 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) 
throws MetaException {
       throw new MetaException("Invalid input for number of txns: " + numTxns);
     }
 
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      try {
-        /*
-         * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work 
correctly, this operation must ensure
-         * that looking at the TXNS table every open transaction could be 
identified below a given High Water Mark.
-         * One way to do it, would be to serialize the openTxns call with a 
S4U lock, but that would cause
-         * performance degradation with high transaction load.
-         * To enable parallel openTxn calls, we define a time period 
(TXN_OPENTXN_TIMEOUT) and consider every
-         * transaction missing from the TXNS table in that period open, and 
prevent opening transaction outside
-         * the period.
-         * Example: At t[0] there is one open transaction in the TXNS table, 
T[1].
-         * T[2] acquires the next sequence at t[1] but only commits into the 
TXNS table at t[10].
-         * T[3] acquires its sequence at t[2], and commits into the TXNS table 
at t[3].
-         * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also 
T[2] in the snapshot’s
-         * open transaction list. T[1] because it is presented as open in TXNS,
-         * T[2] because it is a missing sequence.
-         *
-         * In the current design, there can be several metastore instances 
running in a given Warehouse.
-         * This makes ideas like reserving a range of IDs to save trips to DB 
impossible.  For example,
-         * a client may go to MS1 and start a transaction with ID 500 to 
update a particular row.
-         * Now the same client will start another transaction, except it ends 
up on MS2 and may get
-         * transaction ID 400 and update the same row.  Now the merge that 
happens to materialize the snapshot
-         * on read will thing the version of the row from transaction ID 500 
is the latest one.
-         *
-         * Longer term we can consider running Active-Passive MS (at least wrt 
to ACID operations).  This
-         * set could support a write-through cache for added performance.
-         */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        /*
-         * The openTxn and commitTxn must be mutexed, when committing a not 
read only transaction.
-         * This is achieved by requesting a shared table lock here, and an 
exclusive one at commit.
-         * Since table locks are working in Derby, we don't need the 
lockInternal call here.
-         * Example: Suppose we have two transactions with update like x = x+1.
-         * We have T[3,3] that was using a value from a snapshot with T[2,2]. 
If we allow committing T[3,3]
-         * and opening T[4] parallel it is possible, that T[4] will be using 
the value from a snapshot with T[2,2],
-         * and we will have a lost update problem
-         */
-        acquireTxnLock(stmt, true);
-        // Measure the time from acquiring the sequence value, till committing 
in the TXNS table
-        StopWatch generateTransactionWatch = new StopWatch();
-        generateTransactionWatch.start();
-
-        List<Long> txnIds = openTxns(dbConn, rqst);
+    /*
+     * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work 
correctly, this operation must ensure
+     * that looking at the TXNS table every open transaction could be 
identified below a given High Water Mark.
+     * One way to do it, would be to serialize the openTxns call with a S4U 
lock, but that would cause
+     * performance degradation with high transaction load.
+     * To enable parallel openTxn calls, we define a time period 
(TXN_OPENTXN_TIMEOUT) and consider every
+     * transaction missing from the TXNS table in that period open, and 
prevent opening transaction outside
+     * the period.
+     * Example: At t[0] there is one open transaction in the TXNS table, T[1].
+     * T[2] acquires the next sequence at t[1] but only commits into the TXNS 
table at t[10].
+     * T[3] acquires its sequence at t[2], and commits into the TXNS table at 
t[3].
+     * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also T[2] 
in the snapshot’s
+     * open transaction list. T[1] because it is presented as open in TXNS,
+     * T[2] because it is a missing sequence.
+     *
+     * In the current design, there can be several metastore instances running 
in a given Warehouse.
+     * This makes ideas like reserving a range of IDs to save trips to DB 
impossible.  For example,
+     * a client may go to MS1 and start a transaction with ID 500 to update a 
particular row.
+     * Now the same client will start another transaction, except it ends up 
on MS2 and may get
+     * transaction ID 400 and update the same row.  Now the merge that happens 
to materialize the snapshot
+     * on read will thing the version of the row from transaction ID 500 is 
the latest one.
+     *
+     * Longer term we can consider running Active-Passive MS (at least wrt to 
ACID operations).  This
+     * set could support a write-through cache for added performance.
+     */
+    /*
+     * The openTxn and commitTxn must be mutexed, when committing a not read 
only transaction.
+     * This is achieved by requesting a shared table lock here, and an 
exclusive one at commit.
+     * Since table locks are working in Derby, we don't need the lockInternal 
call here.
+     * Example: Suppose we have two transactions with update like x = x+1.
+     * We have T[3,3] that was using a value from a snapshot with T[2,2]. If 
we allow committing T[3,3]
+     * and opening T[4] parallel it is possible, that T[4] will be using the 
value from a snapshot with T[2,2],
+     * and we will have a lost update problem
+     */
+    acquireTxnLock(true);
+    // Measure the time from acquiring the sequence value, till committing in 
the TXNS table
+    StopWatch generateTransactionWatch = new StopWatch();
+    generateTransactionWatch.start();
 
-        LOG.debug("Going to commit");
-        dbConn.commit();
-        generateTransactionWatch.stop();
-        long elapsedMillis = 
generateTransactionWatch.getTime(TimeUnit.MILLISECONDS);
-        TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : 
TxnType.DEFAULT;
-        if (txnType != TxnType.READ_ONLY && elapsedMillis >= 
openTxnTimeOutMillis) {
-          /*
-           * The commit was too slow, we can not allow this to continue 
(except if it is read only,
-           * since that can not cause dirty reads).
-           * When calculating the snapshot for a given transaction, we look 
back for possible open transactions
-           * (that are not yet committed in the TXNS table), for 
TXN_OPENTXN_TIMEOUT period.
-           * We can not allow a write transaction, that was slower than 
TXN_OPENTXN_TIMEOUT to continue,
-           * because there can be other transactions running, that didn't 
considered this transactionId open,
-           * this could cause dirty reads.
-           */
-          LOG.error("OpenTxnTimeOut exceeded commit duration {}, deleting 
transactionIds: {}", elapsedMillis, txnIds);
-          deleteInvalidOpenTransactions(dbConn, txnIds);
-          dbConn.commit();
-          /*
-           * We do not throw RetryException directly, to not circumvent the 
max retry limit
-           */
-          throw new SQLException("OpenTxnTimeOut exceeded", MANUAL_RETRY);
-        }
-        return new OpenTxnsResponse(txnIds);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback: ", e);
-        rollbackDBConn(dbConn);
-        checkRetryable(e, "openTxns(" + rqst + ")");
-        throw new MetaException("Unable to select from transaction database " 
+ StringUtils.stringifyException(e));
-      } finally {
-        close(null, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return openTxns(rqst);
-    }
-  }
+    List<Long> txnIds = new OpenTxnsFunction(rqst, openTxnTimeOutMillis, 
transactionalListeners).execute(jdbcResource);
 
-  private List<Long> openTxns(Connection dbConn, OpenTxnRequest rqst)
-          throws SQLException, MetaException {
-    int numTxns = rqst.getNum_txns();
-    // Make sure the user has not requested an insane amount of txns.
-    int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH);
-    if (numTxns > maxTxns) {
-      numTxns = maxTxns;
-    }
-    List<PreparedStatement> insertPreparedStmts = null;
+    LOG.debug("Going to commit");
+    
jdbcResource.getTransactionManager().getActiveTransaction().createSavepoint();
+    generateTransactionWatch.stop();
+    long elapsedMillis = 
generateTransactionWatch.getTime(TimeUnit.MILLISECONDS);
     TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : 
TxnType.DEFAULT;
-    boolean isReplayedReplTxn = txnType == TxnType.REPL_CREATED;
-    boolean isHiveReplTxn = rqst.isSetReplPolicy() && txnType == 
TxnType.DEFAULT;
-    try {
-      if (isReplayedReplTxn) {
-        assert rqst.isSetReplPolicy();
-        List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), 
rqst.getReplSrcTxnIds(), dbConn);
-
-        if (!targetTxnIdList.isEmpty()) {
-          if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) {
-            LOG.warn("target txn id number {} is not matching with source txn 
id number {}",
-                targetTxnIdList, rqst.getReplSrcTxnIds());
-          }
-          LOG.info("Target transactions {} are present for repl policy : {} 
and Source transaction id : {}",
-              targetTxnIdList.toString(), rqst.getReplPolicy(), 
rqst.getReplSrcTxnIds().toString());
-          return targetTxnIdList;
-        }
-      }
-
-      long minOpenTxnId = 0;
-      if (useMinHistoryLevel) {
-        minOpenTxnId = getMinOpenTxnIdWaterMark(dbConn);
-      }
-
-      List<Long> txnIds = new ArrayList<>(numTxns);
+    if (txnType != TxnType.READ_ONLY && elapsedMillis >= openTxnTimeOutMillis) 
{
       /*
-       * The getGeneratedKeys are not supported in every dbms, after executing 
a multi line insert.
-       * But it is supported in every used dbms for single line insert, even 
if the metadata says otherwise.
-       * If the getGeneratedKeys are not supported first we insert a random 
batchId in the TXN_META_INFO field,
-       * then the keys are selected beck with that batchid.
+       * The commit was too slow, we can not allow this to continue (except if 
it is read only,
+       * since that can not cause dirty reads).
+       * When calculating the snapshot for a given transaction, we look back 
for possible open transactions
+       * (that are not yet committed in the TXNS table), for 
TXN_OPENTXN_TIMEOUT period.
+       * We can not allow a write transaction, that was slower than 
TXN_OPENTXN_TIMEOUT to continue,
+       * because there can be other transactions running, that didn't 
considered this transactionId open,
+       * this could cause dirty reads.
        */
-      boolean genKeySupport = dbProduct.supportsGetGeneratedKeys();
-      genKeySupport = genKeySupport || (numTxns == 1);
-
-      String insertQuery = String.format(TXNS_INSERT_QRY, 
getEpochFn(dbProduct), getEpochFn(dbProduct));
-      LOG.debug("Going to execute insert <{}>", insertQuery);
-      try (PreparedStatement ps = dbConn.prepareStatement(insertQuery, new 
String[] {"TXN_ID"})) {
-        String state = genKeySupport ? TxnStatus.OPEN.getSqlConst() : 
TXN_TMP_STATE;
-        if (numTxns == 1) {
-          ps.setString(1, state);
-          ps.setString(2, rqst.getUser());
-          ps.setString(3, rqst.getHostname());
-          ps.setInt(4, txnType.getValue());
-          txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, 
genKeySupport, ps, false));
-        } else {
-          for (int i = 0; i < numTxns; ++i) {
-            ps.setString(1, state);
-            ps.setString(2, rqst.getUser());
-            ps.setString(3, rqst.getHostname());
-            ps.setInt(4, txnType.getValue());
-            ps.addBatch();
-
-            if ((i + 1) % maxBatchSize == 0) {
-              
txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, 
genKeySupport, ps, true));
-            }
-          }
-          if (numTxns % maxBatchSize != 0) {
-            txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, 
genKeySupport, ps, true));
-          }
-        }
-      }
-
-      assert txnIds.size() == numTxns;
-
-      addTxnToMinHistoryLevel(dbConn, txnIds, minOpenTxnId);
-
-      if (isReplayedReplTxn) {
-        List<String> rowsRepl = new ArrayList<>(numTxns);
-        List<String> params = Collections.singletonList(rqst.getReplPolicy());
-        List<List<String>> paramsList = new ArrayList<>(numTxns);
-        for (int i = 0; i < numTxns; i++) {
-          rowsRepl.add("?," + rqst.getReplSrcTxnIds().get(i) + "," + 
txnIds.get(i));
-          paramsList.add(params);
-        }
-
-        insertPreparedStmts = 
sqlGenerator.createInsertValuesPreparedStmt(dbConn,
-                "\"REPL_TXN_MAP\" (\"RTM_REPL_POLICY\", \"RTM_SRC_TXN_ID\", 
\"RTM_TARGET_TXN_ID\")", rowsRepl,
-                paramsList);
-        for (PreparedStatement pst : insertPreparedStmts) {
-          pst.execute();
-        }
-      }
-
-      if (transactionalListeners != null && !isHiveReplTxn) {
-        
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, 
txnType), dbConn, sqlGenerator);
-      }
-      return txnIds;
-    } finally {
-      if (insertPreparedStmts != null) {
-        for (PreparedStatement pst : insertPreparedStmts) {
-          pst.close();
-        }
-      }
-    }
-  }
+      LOG.error("OpenTxnTimeOut exceeded commit duration {}, deleting 
transactionIds: {}", elapsedMillis, txnIds);
 
-  private List<Long> executeTxnInsertBatchAndExtractGeneratedKeys(Connection 
dbConn, boolean genKeySupport,
-      PreparedStatement ps, boolean batch) throws SQLException {
-    List<Long> txnIds = new ArrayList<>();
-    if (batch) {
-      ps.executeBatch();
-    } else {
-      // For slight performance advantage we do not use the executeBatch, when 
we only have one row
-      ps.execute();
-    }
-    if (genKeySupport) {
-      try (ResultSet generatedKeys = ps.getGeneratedKeys()) {
-        while (generatedKeys.next()) {
-          txnIds.add(generatedKeys.getLong(1));
-        }
-      }
-    } else {
-      try (PreparedStatement pstmt =
-          dbConn.prepareStatement("SELECT \"TXN_ID\" FROM \"TXNS\" WHERE 
\"TXN_STATE\" = ?")) {
-        pstmt.setString(1, TXN_TMP_STATE);
-        try (ResultSet rs = pstmt.executeQuery()) {
-          while (rs.next()) {
-            txnIds.add(rs.getLong(1));
-          }
+      if (!txnIds.isEmpty()) {
+        try {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to