pkumarsinha commented on a change in pull request #2357:
URL: https://github.com/apache/hive/pull/2357#discussion_r679197638
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
##########
@@ -3101,10 +3102,25 @@ private void constructOneLBLocationMap(FileStatus fSta,
// For acid table, add the acid_write event with file list at the time
of load itself. But
// it should be done after partition is created.
+ List<WriteNotificationLogRequest> requestList = new ArrayList<>();
for (Entry<Path, PartitionDetails> entry :
partitionDetailsMap.entrySet()) {
PartitionDetails partitionDetails = entry.getValue();
if (isTxnTable && partitionDetails.newFiles != null) {
- addWriteNotificationLog(tbl, partitionDetails.fullSpec,
partitionDetails.newFiles, writeId);
+ addWriteNotificationLog(tbl, partitionDetails.fullSpec,
partitionDetails.newFiles,
+ writeId, requestList);
+ }
+ }
+ if (requestList.size() != 0) {
+ WriteNotificationLogBatchRequest rqst = new
WriteNotificationLogBatchRequest(tbl.getCatName(), tbl.getDbName(),
+ tbl.getTableName(), requestList);
+ try {
+ get(conf).getSynchronizedMSC().addWriteNotificationLogInBatch(rqst);
+ } catch (TException e) {
+ // For older HMS, if the batch API is not supported, fall back to
older API.
+ LOG.info("addWriteNotificationLogInBatch failed with ", e);
Review comment:
Testing once for call being idempotent in case of failure should be
helpful as every exception, like say for example, SQL Exception, it might be
treated as incompatibility in HS2 and HMS versions.
##########
File path:
hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
##########
@@ -1104,97 +1138,160 @@ private long getNextEventId(Connection con,
SQLGenerator sqlGenerator, long numV
return nextSequenceValue.get();
}
- private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent
acidWriteEvent, Connection dbConn,
- SQLGenerator sqlGenerator, AcidWriteMessage
msg) throws MetaException, SQLException {
- LOG.debug("DbNotificationListener: adding write notification log for :
{}", event.getMessage());
+ private void addWriteNotificationLog(List<NotificationEvent> eventBatch,
List<AcidWriteEvent> acidWriteEventList,
+ Connection dbConn, SQLGenerator
sqlGenerator, List<AcidWriteMessage> msgBatch)
+ throws MetaException, SQLException {
+ LOG.debug("DbNotificationListener: adding write notification log for :
{}", eventBatch);
assert ((dbConn != null) && (sqlGenerator != null));
- Statement stmt = null;
- PreparedStatement pst = null;
- ResultSet rs = null;
- String dbName = acidWriteEvent.getDatabase();
- String tblName = acidWriteEvent.getTable();
- String partition = acidWriteEvent.getPartition();
- String tableObj = msg.getTableObjStr();
- String partitionObj = msg.getPartitionObjStr();
- String files = ReplChangeManager.joinWithSeparator(msg.getFiles());
+ int numRows;
+ long maxRows = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE);
- try {
- stmt = dbConn.createStatement();
+ try (Statement stmt = dbConn.createStatement()) {
String st = sqlGenerator.getDbProduct().getPrepareTxnStmt();
if (st != null) {
stmt.execute(st);
}
+ } catch (Exception e) {
+ LOG.error("Failed to execute query ", e);
+ throw new MetaException(e.getMessage());
+ }
- String s = sqlGenerator.addForUpdateClause("select \"WNL_FILES\",
\"WNL_ID\" from" +
- " \"TXN_WRITE_NOTIFICATION_LOG\" " +
- "where \"WNL_DATABASE\" = ? " +
- "and \"WNL_TABLE\" = ? " + " and \"WNL_PARTITION\" = ?
" +
- "and \"WNL_TXNID\" = " +
Long.toString(acidWriteEvent.getTxnId()));
- List<String> params = Arrays.asList(dbName, tblName, partition);
- pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
- LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
- quoteString(dbName), quoteString(tblName),
quoteString(partition));
- rs = pst.executeQuery();
- if (!rs.next()) {
- // if rs is empty then no lock is taken and thus it can not cause
deadlock.
- long nextNLId = getNextNLId(dbConn, sqlGenerator,
-
"org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog", 1L);
- s = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" " +
- "(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\",
\"WNL_DATABASE\", \"WNL_TABLE\", " +
- "\"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\",
" +
- "\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES
(?,?,?,?,?,?,?,?,?,?)";
- closeStmt(pst);
- int currentTime = now();
- pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s));
- pst.setLong(1, nextNLId);
- pst.setLong(2, acidWriteEvent.getTxnId());
- pst.setLong(3, acidWriteEvent.getWriteId());
- pst.setString(4, dbName);
- pst.setString(5, tblName);
- pst.setString(6, partition);
- pst.setString(7, tableObj);
- pst.setString(8, partitionObj);
- pst.setString(9, files);
- pst.setInt(10, currentTime);
- LOG.info("Going to execute insert <" + s.replaceAll("\\?", "{}") +
">", nextNLId
- , acidWriteEvent.getTxnId(), acidWriteEvent.getWriteId(),
quoteString(dbName), quoteString(tblName),
- quoteString(partition), quoteString(tableObj),
quoteString(partitionObj), quoteString(files), currentTime);
- pst.execute();
- } else {
- String existingFiles = rs.getString(1);
- if (existingFiles.contains(sqlGenerator.addEscapeCharacters(files))) {
- // If list of files are already present then no need to update it
again. This scenario can come in case of
- // retry done to the meta store for the same operation.
- LOG.info("file list " + files + " already present");
- return;
+ ResultSet rs = null;
+ String select = sqlGenerator.addForUpdateClause("select \"WNL_ID\",
\"WNL_FILES\" from" +
+ " \"TXN_WRITE_NOTIFICATION_LOG\" " +
+ "where \"WNL_DATABASE\" = ? " +
+ "and \"WNL_TABLE\" = ? " + " and \"WNL_PARTITION\" = ? " +
+ "and \"WNL_TXNID\" = ? ");
+ List<Integer> insertList = new ArrayList<>();
+ Map<Integer, Pair<Long, String>> updateMap = new HashMap<>();
+ try (PreparedStatement pst = dbConn.prepareStatement(select)) {
+ for (int i = 0; i < acidWriteEventList.size(); i++) {
+ String dbName = acidWriteEventList.get(i).getDatabase();
+ String tblName = acidWriteEventList.get(i).getTable();
+ String partition = acidWriteEventList.get(i).getPartition();
+ Long txnId = acidWriteEventList.get(i).getTxnId();
+
+ LOG.debug("Going to execute query <" + select.replaceAll("\\?", "{}")
+ ">",
+ quoteString(dbName), quoteString(tblName),
quoteString(partition));
+ pst.setString(1, dbName);
+ pst.setString(2, tblName);
+ pst.setString(3, partition);
+ pst.setLong(4, txnId);
+ rs = pst.executeQuery();
+ if (!rs.next()) {
+ insertList.add(i);
+ } else {
+ updateMap.put(i, new Pair<>(rs.getLong(1), rs.getString(2)));
}
- long nlId = rs.getLong(2);
- int currentTime = now();
- files = ReplChangeManager.joinWithSeparator(Lists.newArrayList(files,
existingFiles));
- s = "update \"TXN_WRITE_NOTIFICATION_LOG\" set \"WNL_TABLE_OBJ\" = ?
," +
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to execute insert ", e);
+ throw new MetaException(e.getMessage());
+ } finally {
+ close(rs);
+ }
+
+ if (insertList.size() != 0) {
+ // if rs is empty then no lock is taken and thus it can not cause
deadlock.
+ long nextNLId = getNextNLId(dbConn, sqlGenerator,
+
"org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog",
insertList.size());
+
+ String insert = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" " +
+ "(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_DATABASE\",
\"WNL_TABLE\", " +
+ "\"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", " +
+ "\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES
(?,?,?,?,?,?,?,?,?,?)";
+ try (PreparedStatement pst =
dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(insert))) {
+ numRows = 0;
+ for (int idx : insertList) {
+ String tableObj = msgBatch.get(idx).getTableObjStr();
+ String partitionObj = msgBatch.get(idx).getPartitionObjStr();
+ String files =
ReplChangeManager.joinWithSeparator(msgBatch.get(idx).getFiles());
+ String dbName = acidWriteEventList.get(idx).getDatabase();
+ String tblName = acidWriteEventList.get(idx).getTable();
+ String partition = acidWriteEventList.get(idx).getPartition();
+ int currentTime = now();
+
+ pst.setLong(1, nextNLId++);
+ pst.setLong(2, acidWriteEventList.get(idx).getTxnId());
+ pst.setLong(3, acidWriteEventList.get(idx).getWriteId());
+ pst.setString(4, dbName);
+ pst.setString(5, tblName);
+ pst.setString(6, partition);
+ pst.setString(7, tableObj);
+ pst.setString(8, partitionObj);
+ pst.setString(9, files);
+ pst.setInt(10, currentTime);
+ LOG.debug("Going to execute insert <" + insert.replaceAll("\\?",
"{}") + ">", nextNLId
+ , acidWriteEventList.get(idx).getTxnId(),
acidWriteEventList.get(idx).getWriteId()
+ , quoteString(dbName), quoteString(tblName),
+ quoteString(partition), quoteString(tableObj),
quoteString(partitionObj), quoteString(files), currentTime);
+ pst.addBatch();
+ numRows++;
+ if (numRows == maxRows) {
+ pst.executeBatch();
+ numRows = 0;
+ }
+ }
+
+ if (numRows != 0) {
+ pst.executeBatch();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to execute insert ", e);
+ throw new MetaException(e.getMessage());
+ }
+ }
+
+ if (updateMap.size() != 0) {
+ String update = "update \"TXN_WRITE_NOTIFICATION_LOG\" set
\"WNL_TABLE_OBJ\" = ? ," +
" \"WNL_PARTITION_OBJ\" = ? ," +
" \"WNL_FILES\" = ? ," +
" \"WNL_EVENT_TIME\" = ?" +
" where \"WNL_ID\" = ?";
- closeStmt(pst);
- pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s));
- pst.setString(1, tableObj);
- pst.setString(2, partitionObj);
- pst.setString(3, files);
- pst.setInt(4, currentTime);
- pst.setLong(5, nlId);
- LOG.info("Going to execute update <" + s.replaceAll("\\?", "{}") +
">", quoteString(tableObj),
- quoteString(partitionObj), quoteString(files), currentTime,
nlId);
- pst.executeUpdate();
+ try (PreparedStatement pst =
dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(update))) {
+ numRows = 0;
+ for (Map.Entry entry : updateMap.entrySet()) {
+ int idx = (int) entry.getKey();
+ Pair<Long, String> nlIdInfo = (Pair<Long, String>) entry.getValue();
+ String tableObj = msgBatch.get(idx).getTableObjStr();
+ String partitionObj = msgBatch.get(idx).getPartitionObjStr();
+ String files =
ReplChangeManager.joinWithSeparator(msgBatch.get(idx).getFiles());
+ String existingFiles = nlIdInfo.second;
+ long nlId = nlIdInfo.first;
+ int currentTime = now();
+
+ if (existingFiles.contains(sqlGenerator.addEscapeCharacters(files)))
{
+ // If list of files are already present then no need to update it
again. This scenario can come in case of
+ // retry done to the meta store for the same operation.
+ LOG.info("file list " + files + " already present");
+ continue;
+ }
+
+ files =
ReplChangeManager.joinWithSeparator(Lists.newArrayList(files, existingFiles));
+
+ pst.setString(1, tableObj);
+ pst.setString(2, partitionObj);
+ pst.setString(3, files);
+ pst.setInt(4, currentTime);
+ pst.setLong(5, nlId);
+ LOG.debug("Going to execute update <" + update.replaceAll("\\?",
"{}") + ">",
+ quoteString(tableObj), quoteString(partitionObj),
quoteString(files), currentTime, nlId);
+ pst.addBatch();
+ numRows++;
+ if (numRows == maxRows) {
+ pst.executeBatch();
+ numRows = 0;
+ }
+ }
+
+ if (numRows != 0) {
+ pst.executeBatch();
+ }
+ } catch (Exception e) {
Review comment:
Should we catch SQLException instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]