[ 
https://issues.apache.org/jira/browse/HIVE-26316?focusedWorklogId=781071&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781071
 ]

ASF GitHub Bot logged work on HIVE-26316:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Jun/22 10:13
            Start Date: 14/Jun/22 10:13
    Worklog Time Spent: 10m 
      Work Description: pvary commented on code in PR #3367:
URL: https://github.com/apache/hive/pull/3367#discussion_r896637183


##########
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java:
##########
@@ -90,6 +98,59 @@ public static boolean checkFileExists(Path dumpPath, 
HiveConf conf, String fileN
     return fs.exists(new Path(dumpPath, fileName));
   }
 
+  public static void prepareAbortTxnsFile(List<NotificationEvent> 
notificationEvents, Set<Long> allOpenTxns,
+                                          Path dumpPath, HiveConf conf) throws 
SemanticException {
+    if (notificationEvents.size() == 0) {
+      return;
+    }
+    Set<Long> txnsOpenedPostCurrEventId = new HashSet<>();
+    MessageDeserializer deserializer = 
ReplUtils.getEventDeserializer(notificationEvents.get(0));
+    for (NotificationEvent event: notificationEvents) {
+      if (event.getEventType().equals(MessageBuilder.OPEN_TXN_EVENT)) {
+        OpenTxnMessage msg = 
deserializer.getOpenTxnMessage(event.getMessage());
+        txnsOpenedPostCurrEventId.addAll(msg.getTxnIds());
+        allOpenTxns.removeAll(msg.getTxnIds());
+      } else if (event.getEventType().equals(MessageBuilder.ABORT_TXN_EVENT)) {
+        AbortTxnMessage msg = 
deserializer.getAbortTxnMessage(event.getMessage());
+        if (!txnsOpenedPostCurrEventId.contains(msg.getTxnId())) {
+          allOpenTxns.add(msg.getTxnId());
+        }
+      } else if (event.getEventType().equals(MessageBuilder.COMMIT_TXN_EVENT)) 
{
+        CommitTxnMessage msg = 
deserializer.getCommitTxnMessage(event.getMessage());
+        if (!txnsOpenedPostCurrEventId.contains(msg.getTxnId())) {
+          allOpenTxns.add(msg.getTxnId());
+        }
+      }
+    }
+    if (!allOpenTxns.isEmpty()) {
+      Utils.writeOutput(flattenListToString(allOpenTxns), new Path(dumpPath, 
ABORT_TXNS_FILE), conf);
+    }
+  }
+
+  public static List<Long> getTxnIdFromAbortTxnsFile(Path dumpPath, HiveConf 
conf) throws IOException {
+    String input;
+    Path abortTxnFile = new Path(dumpPath, ABORT_TXNS_FILE);
+    FileSystem fs = abortTxnFile.getFileSystem(conf);
+    try (FSDataInputStream stream = fs.open(abortTxnFile);) {
+      input = IOUtils.toString(stream, Charset.defaultCharset());
+    }
+    return unflattenListFromString(input);
+  }
+
+  public static String flattenListToString(Set<Long> list) {

Review Comment:
   nit: do we need a one-liner method which is only used in one place?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 781071)
    Time Spent: 50m  (was: 40m)

> Handle dangling open txns on both src & tgt in unplanned failover.
> ------------------------------------------------------------------
>
>                 Key: HIVE-26316
>                 URL: https://issues.apache.org/jira/browse/HIVE-26316
>             Project: Hive
>          Issue Type: Sub-task
>            Reporter: Haymant Mangla
>            Assignee: Haymant Mangla
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to