This is an automated email from the ASF dual-hosted git repository.

dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 179caf754 OOZIE-3666 Oozie log streaming bug when log timestamps are 
the same on multiple Oozie servers (jmakai via dionusos)
179caf754 is described below

commit 179caf7547410cdaddb99a01b430561d7722cc3b
Author: Denes Bodo <[email protected]>
AuthorDate: Thu Sep 1 14:53:13 2022 +0200

    OOZIE-3666 Oozie log streaming bug when log timestamps are the same on 
multiple Oozie servers (jmakai via dionusos)
---
 .../oozie/service/ZKXLogStreamingService.java      | 94 +++++++++++++++++-----
 .../oozie/service/TestZKXLogStreamingService.java  |  6 +-
 release-log.txt                                    |  1 +
 3 files changed, 81 insertions(+), 20 deletions(-)

diff --git 
a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java 
b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
index eac3b38db..dd528bb1d 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,6 +24,7 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -139,7 +140,7 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
         catch (Exception ex) {
             throw new IOException("Issue communicating with ZooKeeper: " + 
ex.getMessage(), ex);
         }
-        List<TimestampedMessageParser> parsers = new 
ArrayList<TimestampedMessageParser>(oozies.size());
+        List<TimestampedMessageParser> parsers = new 
ArrayList<>(oozies.size());
         try {
             // Create a BufferedReader for getting the logs of each server and 
put them in a TimestampedMessageParser
             for (ServiceInstance<Map> oozie : oozies) {
@@ -156,8 +157,8 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
                     String otherUrl = 
oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
                     String jobId = 
logStreamer.getXLogFilter().getFilterParams().get(DagXLogInfoService.JOB);
                     try {
-                     // It's important that we specify ALL_SERVERS_PARAM=false 
in the GET request to prevent the other Oozie
-                     // Server from trying aggregate logs from the other Oozie 
servers (and creating an infinite recursion)
+                        // It's important that we specify 
ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie
+                        // Server from trying aggregate logs from the other 
Oozie servers (and creating an infinite recursion)
                         final String url = otherUrl + "/v" + 
OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB
                                 + "/" + jobId + "?" + 
RestConstants.JOB_SHOW_PARAM + "=" + logStreamer.getLogType()
                                 + "&" + RestConstants.ALL_SERVER_REQUEST + 
"=false"
@@ -198,7 +199,8 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
             }
             // Add a message about any servers we couldn't contact
             if (!badOozies.isEmpty()) {
-                writer.write("Unable to contact the following Oozie Servers 
for logs (log information may be incomplete):\n");
+                writer.write("Unable to contact the following Oozie Servers 
for logs (log information may be " +
+                        "incomplete):\n");
                 for (String badOozie : badOozies) {
                     writer.write("     ");
                     writer.write(badOozie);
@@ -208,26 +210,33 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
                 writer.flush();
             }
 
-            // If it's just the one server (this server), then we don't need 
to do any more processing and can just copy it directly
+            // If it's just the one server (this server), then we don't need 
to do any more processing and can just
+            // copy it directly
             if (parsers.size() == 1) {
                 TimestampedMessageParser parser = parsers.get(0);
                 parser.processRemaining(writer, logStreamer);
             }
             else {
-                // Now that we have a Reader for each server to get the logs 
from that server, we have to collate them.  Within each
-                // server, the logs should already be in the correct order, so 
we can take advantage of that.  We'll use the
-                // BufferedReaders to read the messages from the logs of each 
server and put them in order without having to bring
-                // every message into memory at the same time.
-                TreeMap<String, TimestampedMessageParser> timestampMap = new 
TreeMap<String, TimestampedMessageParser>();
-                // populate timestampMap with initial values
+                // Now that we have a Reader for each server to get the logs 
from that server, we have to collate them.
+                // Within each server, the logs should already be in the 
correct order, so we can take advantage of
+                // that.  We'll use the BufferedReaders to read the messages 
from the logs of each server and put them
+                // in order without having to bring every message into memory 
at the same time.
+
+                // The created TreeMap is capable of handling duplicate keys 
(timestamps) by maintaining a
+                // collection of values per key. This is important as the 
timestamps can be exactly the same in
+                // multiple Oozie servers which would end up with missing 
parsed log messages in case duplicated
+                // keys are not allowed.
+                Map<String, List<TimestampedMessageParser>> timestampTreeMap = 
new TreeMap<>();
+
+                // populate timestampMultimap with initial values
                 for (TimestampedMessageParser parser : parsers) {
                     if (parser.increment()) {
-                        timestampMap.put(parser.getLastTimestamp(), parser);
+                        putParser(timestampTreeMap, parser.getLastTimestamp(), 
parser);
                     }
                 }
-                while (timestampMap.size() > 1) {
+                while (getParserAmount(timestampTreeMap) > 1) {
                     // The first entry will be the earliest based on the 
timestamp (also removes it) from the map
-                    TimestampedMessageParser earliestParser = 
timestampMap.pollFirstEntry().getValue();
+                    TimestampedMessageParser earliestParser = 
pollFirstParser(timestampTreeMap);
                     // Write the message from that parser at that timestamp
                     
writer.write(StringEscapeUtils.escapeHtml4(earliestParser.getLastMessage()));
                     if 
(logStreamer.shouldFlushOutput(earliestParser.getLastMessage().length())) {
@@ -236,12 +245,13 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
                     // Increment that parser to read the next message
                     if (earliestParser.increment()) {
                         // If it still has messages left, put it back in the 
map with the new last timestamp for it
-                        timestampMap.put(earliestParser.getLastTimestamp(), 
earliestParser);
+                        putParser(timestampTreeMap, 
earliestParser.getLastTimestamp(), earliestParser);
                     }
                 }
-                // If there's only one parser left in the map, then we can 
simply copy the rest of its lines directly to be faster
-                if (timestampMap.size() == 1) {
-                    TimestampedMessageParser parser = 
timestampMap.values().iterator().next();
+                // If there's only one parser left in the map, then we can 
simply copy the rest of its lines directly
+                // to be faster
+                if (getParserAmount(timestampTreeMap) == 1) {
+                    TimestampedMessageParser parser = 
timestampTreeMap.values().iterator().next().get(0);
                     // don't forget the last message read by the parser
                     
writer.write(StringEscapeUtils.escapeHtml4(parser.getLastMessage()));
                     parser.processRemaining(writer, logStreamer);
@@ -254,4 +264,50 @@ public class ZKXLogStreamingService extends 
XLogStreamingService implements Serv
             }
         }
     }
+
+    /**
+     * Retrieves and removes the first parser entry value from the provided 
TreeMap.
+     *
+     * @param timestampTreeMap the TreeMap from where the parser value to be 
polled
+     * @return the first parser element from the provided TreeMap
+     */
+    private TimestampedMessageParser pollFirstParser(Map<String, 
List<TimestampedMessageParser>> timestampTreeMap) {
+        Iterator<Map.Entry<String, List<TimestampedMessageParser>>> entry = 
timestampTreeMap.entrySet().iterator();
+        List<TimestampedMessageParser> entryList = entry.next().getValue();
+
+        TimestampedMessageParser parser = entryList.get(0);
+
+        if (entryList.size() == 1) {
+            entry.remove();
+        } else {
+            entryList.remove(parser);
+        }
+        return parser;
+    }
+
+    /**
+     * Adds the provided TimestampedMessageParser object identified by a 
timestamp string to the given TreeMap.
+     *
+     * @param timestampTreeMap the TreeMap from where the parser value to be 
polled
+     * @param timestamp timestamp string to be used as an identifier in the 
TreeMap
+     * @param parser the TimestampedMessageParser object to be added to the 
TreeMap
+     */
+    private void putParser(Map<String, List<TimestampedMessageParser>> 
timestampTreeMap, String timestamp,
+                                  TimestampedMessageParser parser) {
+        timestampTreeMap.computeIfAbsent(timestamp, (unused) -> new 
ArrayList<>()).add(parser);
+    }
+
+    /**
+     * Retrieves the number of TimestampedMessageParser elements in the given 
TreeMap.
+     *
+     * @param timestampTreeMap the TreeMap to be checked
+     * @return number of TimestampedMessageParser in the given TreeMap
+     */
+    private int getParserAmount(Map<String, List<TimestampedMessageParser>> 
timestampTreeMap) {
+        int parserAmount = 0;
+        for (Map.Entry<String, List<TimestampedMessageParser>> entry : 
timestampTreeMap.entrySet()) {
+            parserAmount += entry.getValue().size();
+        }
+        return parserAmount;
+    }
 }
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java 
b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
index 9119e6c17..4b9cf1b5f 100644
--- 
a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
+++ 
b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
@@ -272,6 +272,10 @@ public class TestZKXLogStreamingService extends 
ZKXTestCase {
                                 
Services.get().get(XLogService.class).getOozieLogName());
         logFile.getParentFile().mkdirs();
         Writer logWriter = new OutputStreamWriter(new 
FileOutputStream(logFile), StandardCharsets.UTF_8);
+
+        // Notice below that the timestamps (2013-06-10 10:25:44,008) of the 
first log messages of the "Oozie servers"
+        // are the same. This is intentional and is supposed to test a corner 
case in the mechanism of the log streaming
+
         // local logs
         logWriter.append("2013-06-10 10:25:44,008 WARN HiveActionExecutor:542 
SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] "
                 + "APP[hive-wf] JOB[0000003-130610102426873-oozie-rkan-W] 
ACTION[0000003-130610102426873-oozie-rkan-W@hive-node] "
@@ -285,7 +289,7 @@ public class TestZKXLogStreamingService extends ZKXTestCase 
{
         logWriter.close();
         // logs to be returned by another "Oozie server"
         DummyLogStreamingServlet.logs =
-                "2013-06-10 10:25:43,575 WARN ActionStartXCommand:542 
SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] "
+                "2013-06-10 10:25:44,008 WARN ActionStartXCommand:542 
SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] "
                 + "JOB[0000003-130610102426873-oozie-rkan-W] 
ACTION[0000003-130610102426873-oozie-rkan-W@:start:] "
                 + "[***0000003-130610102426873-oozie-rkan-W@:start:***]Action 
status=DONE _L1_"
                 + "\n"
diff --git a/release-log.txt b/release-log.txt
index 909db42ec..2122ccfab 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.3.0 release (trunk - unreleased)
 
+OOZIE-3666 Oozie log streaming bug when log timestamps are the same on 
multiple Oozie servers (jmakai via dionusos)
 OOZIE-3661 Oozie cannot handle environment variables with key=value content 
(dionusos via asalamon74)
 OOZIE-3659 oozieUrl ambiguous port number in TestOozieCLI.java (AlexaD via 
dionusos)
 OOZIE-3658 Fix TestJMSJobEventListener#testConnectionDrop flakiness again 
(dionusos via asalamon74)

Reply via email to