deniskuzZ commented on code in PR #4384:
URL: https://github.com/apache/hive/pull/4384#discussion_r1309019954


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java:
##########
@@ -1804,263 +572,56 @@ protected void 
updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, Tx
     }
   }
 
-  private Optional<CompactionInfo> getCompactionByTxnId(Connection dbConn, 
long txnid) throws SQLException, MetaException {
-    CompactionInfo info = null;
-    try (PreparedStatement pStmt = 
dbConn.prepareStatement(SELECT_COMPACTION_QUEUE_BY_TXN_ID)) {
-      pStmt.setLong(1, txnid);
-      try (ResultSet rs = pStmt.executeQuery()) {
-        if (rs.next()) {
-          info = CompactionInfo.loadFullFromCompactionQueue(rs);
-        }
-      }
-    }
-    return Optional.ofNullable(info);
-  }
-
   @Override
   public Optional<CompactionInfo> getCompactionByTxnId(long txnId) throws 
MetaException {
-    Connection dbConn = null;
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
-        return getCompactionByTxnId(dbConn, txnId);
-      } catch (SQLException e) {
-        LOG.error("Unable to getCompactionByTxnId", e);
-        rollbackDBConn(dbConn);
-        checkRetryable(e, "getCompactionByTxnId");
-        throw new MetaException("Unable to execute getCompactionByTxnId() " + 
e.getMessage());
-      } finally {
-        closeDbConn(dbConn);
-      }
-    } catch (RetryException e) {
-      return getCompactionByTxnId(txnId);
-    }
+    return Optional.ofNullable(getCompactionByTxnId(jdbcTemplate, txnId));
+  }
+
+  private CompactionInfo getCompactionByTxnId(DataSourceWrapper 
dataSourceWrapper, long txnId) throws MetaException {
+    return dataSourceWrapper.execute( new GetCompactionInfoHandler(txnId, 
true)); 
   }
 
   @Override
-  protected void createCommitNotificationEvent(Connection dbConn, long txnid, 
TxnType txnType)
+  protected void createCommitNotificationEvent(DataSourceWrapper 
dataSourceWrapper, long txnid, TxnType txnType)
       throws MetaException, SQLException {
-    super.createCommitNotificationEvent(dbConn, txnid, txnType);
+    super.createCommitNotificationEvent(dataSourceWrapper, txnid, txnType);
     if (transactionalListeners != null) {
-      Optional<CompactionInfo> compactionInfo = getCompactionByTxnId(dbConn, 
txnid);
-      if (compactionInfo.isPresent()) {
+      //Please note that TxnHandler and CompactionTxnHandler are using 
different DataSources (to have different pools).
+      //This call must use the same transaction and connection as 
TxnHandler.commitTxn(), therefore we are passing the 
+      //datasource wrapper comming from TxnHandler. Without this, the 
getCompactionByTxnId(long txnId) call would be
+      //executed using a different connection obtained from 
CompactionTxnHandler's own datasourceWrapper. 
+      CompactionInfo compactionInfo = getCompactionByTxnId(dataSourceWrapper, 
txnid);
+      if (compactionInfo != null) {
         MetaStoreListenerNotifier
             .notifyEventWithDirectSql(transactionalListeners, 
EventMessage.EventType.COMMIT_COMPACTION,
-                new CommitCompactionEvent(txnid, compactionInfo.get()), 
dbConn, sqlGenerator);
+                new CommitCompactionEvent(txnid, compactionInfo), 
dataSourceWrapper.getConnection(), sqlGenerator);
       } else {
         LOG.warn("No compaction queue record found for Compaction type 
transaction commit. txnId:" + txnid);
       }
     }
   }
-
+  
   @Override
   public boolean updateCompactionMetricsData(CompactionMetricsData data) 
throws MetaException {
-    Connection dbConn = null;
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
-        boolean updateRes;
-        CompactionMetricsData prevMetricsData = getCompactionMetricsData(data, 
dbConn);
-        if (data.getMetricValue() >= data.getThreshold()) {
-          if (prevMetricsData != null) {
-            updateRes = updateCompactionMetricsData(dbConn, data, 
prevMetricsData);
-          } else {
-            updateRes = createCompactionMetricsData(dbConn, data);
-          }
-        } else {
-          if (prevMetricsData != null) {
-            updateRes =
-                removeCompactionMetricsData(dbConn, data.getDbName(), 
data.getTblName(), data.getPartitionName(), data.getMetricType());
-          } else {
-            return true;
-          }
-        }
-        return updateRes;
-      } catch (SQLException e) {
-        rollbackDBConn(dbConn);
-        checkRetryable(e, "updateCompactionMetricsData(" + data + ")");
-        throw new MetaException("Unable to execute 
updateCompactionMetricsData()" + e.getMessage());
-      } finally {
-        closeDbConn(dbConn);
-      }
-    } catch (RetryException e) {
-      updateCompactionMetricsData(data);
-    }
-    return false;
+    return new UpdateCompactionMetricsDataFunction(data).execute(jdbcTemplate);

Review Comment:
   jdbcTemplate.execute(UpdateCompactionMetricsDataFunction(data))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to