pvary commented on code in PR #3367:
URL: https://github.com/apache/hive/pull/3367#discussion_r896638329
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java:
##########
@@ -90,6 +98,59 @@ public static boolean checkFileExists(Path dumpPath,
HiveConf conf, String fileN
return fs.exists(new Path(dumpPath, fileName));
}
+ public static void prepareAbortTxnsFile(List<NotificationEvent>
notificationEvents, Set<Long> allOpenTxns,
+ Path dumpPath, HiveConf conf) throws
SemanticException {
+ if (notificationEvents.size() == 0) {
+ return;
+ }
+ Set<Long> txnsOpenedPostCurrEventId = new HashSet<>();
+ MessageDeserializer deserializer =
ReplUtils.getEventDeserializer(notificationEvents.get(0));
+ for (NotificationEvent event: notificationEvents) {
+ if (event.getEventType().equals(MessageBuilder.OPEN_TXN_EVENT)) {
+ OpenTxnMessage msg =
deserializer.getOpenTxnMessage(event.getMessage());
+ txnsOpenedPostCurrEventId.addAll(msg.getTxnIds());
+ allOpenTxns.removeAll(msg.getTxnIds());
+ } else if (event.getEventType().equals(MessageBuilder.ABORT_TXN_EVENT)) {
+ AbortTxnMessage msg =
deserializer.getAbortTxnMessage(event.getMessage());
+ if (!txnsOpenedPostCurrEventId.contains(msg.getTxnId())) {
+ allOpenTxns.add(msg.getTxnId());
+ }
+ } else if (event.getEventType().equals(MessageBuilder.COMMIT_TXN_EVENT))
{
+ CommitTxnMessage msg =
deserializer.getCommitTxnMessage(event.getMessage());
+ if (!txnsOpenedPostCurrEventId.contains(msg.getTxnId())) {
+ allOpenTxns.add(msg.getTxnId());
+ }
+ }
+ }
+ if (!allOpenTxns.isEmpty()) {
+ Utils.writeOutput(flattenListToString(allOpenTxns), new Path(dumpPath,
ABORT_TXNS_FILE), conf);
+ }
+ }
+
+ public static List<Long> getTxnIdFromAbortTxnsFile(Path dumpPath, HiveConf
conf) throws IOException {
+ String input;
+ Path abortTxnFile = new Path(dumpPath, ABORT_TXNS_FILE);
+ FileSystem fs = abortTxnFile.getFileSystem(conf);
+ try (FSDataInputStream stream = fs.open(abortTxnFile);) {
+ input = IOUtils.toString(stream, Charset.defaultCharset());
+ }
+ return unflattenListFromString(input);
+ }
+
+ public static String flattenListToString(Set<Long> list) {
Review Comment:
If we need these we still might want to keep them `private`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]