[
https://issues.apache.org/jira/browse/HIVE-26316?focusedWorklogId=781071&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781071
]
ASF GitHub Bot logged work on HIVE-26316:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 14/Jun/22 10:13
Start Date: 14/Jun/22 10:13
Worklog Time Spent: 10m
Work Description: pvary commented on code in PR #3367:
URL: https://github.com/apache/hive/pull/3367#discussion_r896637183
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java:
##########
@@ -90,6 +98,59 @@ public static boolean checkFileExists(Path dumpPath,
HiveConf conf, String fileN
return fs.exists(new Path(dumpPath, fileName));
}
+ public static void prepareAbortTxnsFile(List<NotificationEvent>
notificationEvents, Set<Long> allOpenTxns,
+ Path dumpPath, HiveConf conf) throws
SemanticException {
+ if (notificationEvents.size() == 0) {
+ return;
+ }
+ Set<Long> txnsOpenedPostCurrEventId = new HashSet<>();
+ MessageDeserializer deserializer =
ReplUtils.getEventDeserializer(notificationEvents.get(0));
+ for (NotificationEvent event: notificationEvents) {
+ if (event.getEventType().equals(MessageBuilder.OPEN_TXN_EVENT)) {
+ OpenTxnMessage msg =
deserializer.getOpenTxnMessage(event.getMessage());
+ txnsOpenedPostCurrEventId.addAll(msg.getTxnIds());
+ allOpenTxns.removeAll(msg.getTxnIds());
+ } else if (event.getEventType().equals(MessageBuilder.ABORT_TXN_EVENT)) {
+ AbortTxnMessage msg =
deserializer.getAbortTxnMessage(event.getMessage());
+ if (!txnsOpenedPostCurrEventId.contains(msg.getTxnId())) {
+ allOpenTxns.add(msg.getTxnId());
+ }
+ } else if (event.getEventType().equals(MessageBuilder.COMMIT_TXN_EVENT))
{
+ CommitTxnMessage msg =
deserializer.getCommitTxnMessage(event.getMessage());
+ if (!txnsOpenedPostCurrEventId.contains(msg.getTxnId())) {
+ allOpenTxns.add(msg.getTxnId());
+ }
+ }
+ }
+ if (!allOpenTxns.isEmpty()) {
+ Utils.writeOutput(flattenListToString(allOpenTxns), new Path(dumpPath,
ABORT_TXNS_FILE), conf);
+ }
+ }
+
+ public static List<Long> getTxnIdFromAbortTxnsFile(Path dumpPath, HiveConf
conf) throws IOException {
+ String input;
+ Path abortTxnFile = new Path(dumpPath, ABORT_TXNS_FILE);
+ FileSystem fs = abortTxnFile.getFileSystem(conf);
+ try (FSDataInputStream stream = fs.open(abortTxnFile);) {
+ input = IOUtils.toString(stream, Charset.defaultCharset());
+ }
+ return unflattenListFromString(input);
+ }
+
+ public static String flattenListToString(Set<Long> list) {
Review Comment:
nit: do we need a one-liner method which is only used in one place?
Issue Time Tracking
-------------------
Worklog Id: (was: 781071)
Time Spent: 50m (was: 40m)
> Handle dangling open txns on both src & tgt in unplanned failover.
> ------------------------------------------------------------------
>
> Key: HIVE-26316
> URL: https://issues.apache.org/jira/browse/HIVE-26316
> Project: Hive
> Issue Type: Sub-task
> Reporter: Haymant Mangla
> Assignee: Haymant Mangla
> Priority: Major
> Labels: pull-request-available
> Time Spent: 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)