This is an automated email from the ASF dual-hosted git repository.
dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git
The following commit(s) were added to refs/heads/master by this push:
new 179caf754 OOZIE-3666 Oozie log streaming bug when log timestamps are
the same on multiple Oozie servers (jmakai via dionusos)
179caf754 is described below
commit 179caf7547410cdaddb99a01b430561d7722cc3b
Author: Denes Bodo <[email protected]>
AuthorDate: Thu Sep 1 14:53:13 2022 +0200
OOZIE-3666 Oozie log streaming bug when log timestamps are the same on
multiple Oozie servers (jmakai via dionusos)
---
.../oozie/service/ZKXLogStreamingService.java | 94 +++++++++++++++++-----
.../oozie/service/TestZKXLogStreamingService.java | 6 +-
release-log.txt | 1 +
3 files changed, 81 insertions(+), 20 deletions(-)
diff --git
a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
index eac3b38db..dd528bb1d 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,6 +24,7 @@ import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -139,7 +140,7 @@ public class ZKXLogStreamingService extends
XLogStreamingService implements Serv
catch (Exception ex) {
throw new IOException("Issue communicating with ZooKeeper: " +
ex.getMessage(), ex);
}
- List<TimestampedMessageParser> parsers = new
ArrayList<TimestampedMessageParser>(oozies.size());
+ List<TimestampedMessageParser> parsers = new
ArrayList<>(oozies.size());
try {
// Create a BufferedReader for getting the logs of each server and
put them in a TimestampedMessageParser
for (ServiceInstance<Map> oozie : oozies) {
@@ -156,8 +157,8 @@ public class ZKXLogStreamingService extends
XLogStreamingService implements Serv
String otherUrl =
oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
String jobId =
logStreamer.getXLogFilter().getFilterParams().get(DagXLogInfoService.JOB);
try {
- // It's important that we specify ALL_SERVERS_PARAM=false
in the GET request to prevent the other Oozie
- // Server from trying aggregate logs from the other Oozie
servers (and creating an infinite recursion)
+ // It's important that we specify
ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie
+ // Server from trying aggregate logs from the other
Oozie servers (and creating an infinite recursion)
final String url = otherUrl + "/v" +
OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB
+ "/" + jobId + "?" +
RestConstants.JOB_SHOW_PARAM + "=" + logStreamer.getLogType()
+ "&" + RestConstants.ALL_SERVER_REQUEST +
"=false"
@@ -198,7 +199,8 @@ public class ZKXLogStreamingService extends
XLogStreamingService implements Serv
}
// Add a message about any servers we couldn't contact
if (!badOozies.isEmpty()) {
- writer.write("Unable to contact the following Oozie Servers
for logs (log information may be incomplete):\n");
+ writer.write("Unable to contact the following Oozie Servers
for logs (log information may be " +
+ "incomplete):\n");
for (String badOozie : badOozies) {
writer.write(" ");
writer.write(badOozie);
@@ -208,26 +210,33 @@ public class ZKXLogStreamingService extends
XLogStreamingService implements Serv
writer.flush();
}
- // If it's just the one server (this server), then we don't need
to do any more processing and can just copy it directly
+ // If it's just the one server (this server), then we don't need
to do any more processing and can just
+ // copy it directly
if (parsers.size() == 1) {
TimestampedMessageParser parser = parsers.get(0);
parser.processRemaining(writer, logStreamer);
}
else {
- // Now that we have a Reader for each server to get the logs
from that server, we have to collate them. Within each
- // server, the logs should already be in the correct order, so
we can take advantage of that. We'll use the
- // BufferedReaders to read the messages from the logs of each
server and put them in order without having to bring
- // every message into memory at the same time.
- TreeMap<String, TimestampedMessageParser> timestampMap = new
TreeMap<String, TimestampedMessageParser>();
- // populate timestampMap with initial values
+ // Now that we have a Reader for each server to get the logs
from that server, we have to collate them.
+ // Within each server, the logs should already be in the
correct order, so we can take advantage of
+ // that. We'll use the BufferedReaders to read the messages
from the logs of each server and put them
+ // in order without having to bring every message into memory
at the same time.
+
+ // The created TreeMap is capable of handling duplicate keys
(timestamps) by maintaining a
+ // collection of values per key. This is important as the
timestamps can be exactly the same in
+ // multiple Oozie servers which would end up with missing
parsed log messages in case duplicated
+ // keys are not allowed.
+ Map<String, List<TimestampedMessageParser>> timestampTreeMap =
new TreeMap<>();
+
+ // populate timestampMultimap with initial values
for (TimestampedMessageParser parser : parsers) {
if (parser.increment()) {
- timestampMap.put(parser.getLastTimestamp(), parser);
+ putParser(timestampTreeMap, parser.getLastTimestamp(),
parser);
}
}
- while (timestampMap.size() > 1) {
+ while (getParserAmount(timestampTreeMap) > 1) {
// The first entry will be the earliest based on the
timestamp (also removes it) from the map
- TimestampedMessageParser earliestParser =
timestampMap.pollFirstEntry().getValue();
+ TimestampedMessageParser earliestParser =
pollFirstParser(timestampTreeMap);
// Write the message from that parser at that timestamp
writer.write(StringEscapeUtils.escapeHtml4(earliestParser.getLastMessage()));
if
(logStreamer.shouldFlushOutput(earliestParser.getLastMessage().length())) {
@@ -236,12 +245,13 @@ public class ZKXLogStreamingService extends
XLogStreamingService implements Serv
// Increment that parser to read the next message
if (earliestParser.increment()) {
// If it still has messages left, put it back in the
map with the new last timestamp for it
- timestampMap.put(earliestParser.getLastTimestamp(),
earliestParser);
+ putParser(timestampTreeMap,
earliestParser.getLastTimestamp(), earliestParser);
}
}
- // If there's only one parser left in the map, then we can
simply copy the rest of its lines directly to be faster
- if (timestampMap.size() == 1) {
- TimestampedMessageParser parser =
timestampMap.values().iterator().next();
+ // If there's only one parser left in the map, then we can
simply copy the rest of its lines directly
+ // to be faster
+ if (getParserAmount(timestampTreeMap) == 1) {
+ TimestampedMessageParser parser =
timestampTreeMap.values().iterator().next().get(0);
// don't forget the last message read by the parser
writer.write(StringEscapeUtils.escapeHtml4(parser.getLastMessage()));
parser.processRemaining(writer, logStreamer);
@@ -254,4 +264,50 @@ public class ZKXLogStreamingService extends
XLogStreamingService implements Serv
}
}
}
+
+ /**
+ * Retrieves and removes the first parser entry value from the provided
TreeMap.
+ *
+ * @param timestampTreeMap the TreeMap from where the parser value to be
polled
+ * @return the first parser element from the provided TreeMap
+ */
+ private TimestampedMessageParser pollFirstParser(Map<String,
List<TimestampedMessageParser>> timestampTreeMap) {
+ Iterator<Map.Entry<String, List<TimestampedMessageParser>>> entry =
timestampTreeMap.entrySet().iterator();
+ List<TimestampedMessageParser> entryList = entry.next().getValue();
+
+ TimestampedMessageParser parser = entryList.get(0);
+
+ if (entryList.size() == 1) {
+ entry.remove();
+ } else {
+ entryList.remove(parser);
+ }
+ return parser;
+ }
+
+ /**
+ * Adds the provided TimestampedMessageParser object identified by a
timestamp string to the given TreeMap.
+ *
+ * @param timestampTreeMap the TreeMap from where the parser value to be
polled
+ * @param timestamp timestamp string to be used as an identifier in the
TreeMap
+ * @param parser the TimestampedMessageParser object to be added to the
TreeMap
+ */
+ private void putParser(Map<String, List<TimestampedMessageParser>>
timestampTreeMap, String timestamp,
+ TimestampedMessageParser parser) {
+ timestampTreeMap.computeIfAbsent(timestamp, (unused) -> new
ArrayList<>()).add(parser);
+ }
+
+ /**
+ * Retrieves the number of TimestampedMessageParser elements in the given
TreeMap.
+ *
+ * @param timestampTreeMap the TreeMap to be checked
+ * @return number of TimestampedMessageParser in the given TreeMap
+ */
+ private int getParserAmount(Map<String, List<TimestampedMessageParser>>
timestampTreeMap) {
+ int parserAmount = 0;
+ for (Map.Entry<String, List<TimestampedMessageParser>> entry :
timestampTreeMap.entrySet()) {
+ parserAmount += entry.getValue().size();
+ }
+ return parserAmount;
+ }
}
diff --git
a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
index 9119e6c17..4b9cf1b5f 100644
---
a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
+++
b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
@@ -272,6 +272,10 @@ public class TestZKXLogStreamingService extends
ZKXTestCase {
Services.get().get(XLogService.class).getOozieLogName());
logFile.getParentFile().mkdirs();
Writer logWriter = new OutputStreamWriter(new
FileOutputStream(logFile), StandardCharsets.UTF_8);
+
+ // Notice below that the timestamps (2013-06-10 10:25:44,008) of the
first log messages of the "Oozie servers"
+ // are the same. This is intentional and is supposed to test a corner
case in the mechanism of the log streaming
+
// local logs
logWriter.append("2013-06-10 10:25:44,008 WARN HiveActionExecutor:542
SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] "
+ "APP[hive-wf] JOB[0000003-130610102426873-oozie-rkan-W]
ACTION[0000003-130610102426873-oozie-rkan-W@hive-node] "
@@ -285,7 +289,7 @@ public class TestZKXLogStreamingService extends ZKXTestCase
{
logWriter.close();
// logs to be returned by another "Oozie server"
DummyLogStreamingServlet.logs =
- "2013-06-10 10:25:43,575 WARN ActionStartXCommand:542
SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] "
+ "2013-06-10 10:25:44,008 WARN ActionStartXCommand:542
SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] "
+ "JOB[0000003-130610102426873-oozie-rkan-W]
ACTION[0000003-130610102426873-oozie-rkan-W@:start:] "
+ "[***0000003-130610102426873-oozie-rkan-W@:start:***]Action
status=DONE _L1_"
+ "\n"
diff --git a/release-log.txt b/release-log.txt
index 909db42ec..2122ccfab 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.3.0 release (trunk - unreleased)
+OOZIE-3666 Oozie log streaming bug when log timestamps are the same on
multiple Oozie servers (jmakai via dionusos)
OOZIE-3661 Oozie cannot handle environment variables with key=value content
(dionusos via asalamon74)
OOZIE-3659 oozieUrl ambiguous port number in TestOozieCLI.java (AlexaD via
dionusos)
OOZIE-3658 Fix TestJMSJobEventListener#testConnectionDrop flakiness again
(dionusos via asalamon74)