This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new f77ef94 Support "rolled" log files in gnmsg (#854)
f77ef94 is described below
commit f77ef94894fbaec70fff94a567b8d462dcaed24b
Author: Blake Bender <[email protected]>
AuthorDate: Wed Aug 25 10:41:02 2021 -0700
Support "rolled" log files in gnmsg (#854)
- With new parameter --rolled, gnmsg will expect file of the form
<<basename>>-<<number>>.<<ext>>, and will scan the same directory for
files of
the same form with incrementing numbers until it runs out. It will
then attempt to open <<basename>>.<<ext>>, which should be the last file
geode-native was logging to at the time of app exit. Output will be a
single consecutive JSON array of message structures, handy for
post-processing.
- single file behavior remains the same as before
---
tools/gnmsg/client_message_decoder.py | 4 +-
tools/gnmsg/command_line.py | 7 +-
tools/gnmsg/gnmsg.py | 160 +++++++++++++++++++++++++---------
tools/gnmsg/server_message_decoder.py | 10 +--
4 files changed, 135 insertions(+), 46 deletions(-)
diff --git a/tools/gnmsg/client_message_decoder.py
b/tools/gnmsg/client_message_decoder.py
index 3e2bdf3..6d9fd30 100644
--- a/tools/gnmsg/client_message_decoder.py
+++ b/tools/gnmsg/client_message_decoder.py
@@ -15,6 +15,7 @@
# limitations under the License.
import re
import struct
+import sys
from dateutil import parser
@@ -242,7 +243,8 @@ class ClientMessageDecoder(DecoderBase):
self.request_requires_security_footer(str(send_trace["Type"]))
):
print(
- "ERROR: Security flag is set, but no footer was added
for this message!"
+ "ERROR: Security flag is set, but no footer was added
for this message!",
+ file=sys.stderr,
)
parse_client_message(send_trace, message_bytes)
diff --git a/tools/gnmsg/command_line.py b/tools/gnmsg/command_line.py
index e19a988..04a6044 100755
--- a/tools/gnmsg/command_line.py
+++ b/tools/gnmsg/command_line.py
@@ -41,6 +41,11 @@ def parse_command_line():
"--thread_id", metavar="T", nargs="?", help="Show only messages on
this thread"
)
+ parser.add_argument(
+ "--rolled",
+ action="store_true",
+ help="(optionally) treat file as first in a sequence of rolled log
files, and scan all sequential files.",
+ )
args = parser.parse_args()
if args.file is None:
@@ -48,4 +53,4 @@ def parse_command_line():
parser.print_help()
sys.exit(1)
- return (args.file, args.handshake, args.messages, args.thread_id)
+ return (args.file, args.handshake, args.messages, args.thread_id,
args.rolled)
diff --git a/tools/gnmsg/gnmsg.py b/tools/gnmsg/gnmsg.py
index 111966b..999824f 100755
--- a/tools/gnmsg/gnmsg.py
+++ b/tools/gnmsg/gnmsg.py
@@ -15,6 +15,7 @@
# limitations under the License.
import json
import queue
+import os
import re
import sys
import threading
@@ -45,51 +46,54 @@ from server_message_decoder import ServerMessageDecoder
from handshake_decoder import HandshakeDecoder
-def scan_file(filename, dump_handshake, dump_messages, thread_id):
- output_queue = queue.Queue()
- separator = ""
+def scan_opened_file(
+ file,
+ handshake_decoder,
+ client_decoder,
+ server_decoder,
+ output_queue,
+ dump_handshake,
+ dump_messages,
+ thread_id,
+ start_string,
+):
+ separator = start_string
if dump_handshake:
handshake_decoder = HandshakeDecoder(output_queue)
- with open(filename, "rb") as f:
- for line in f:
- handshake_decoder.process_line(line.decode("utf-8"))
- try:
- data = output_queue.get_nowait()
- for key, value in data.items():
- if key == "handshake":
- print(separator + json.dumps(value, indent=2,
default=str))
- separator = ","
- except queue.Empty:
- continue
-
- separator = ""
- client_decoder = ClientMessageDecoder(output_queue)
- server_decoder = ServerMessageDecoder(output_queue)
- print("[")
- with open(filename, "rb") as f:
- for line in f:
- linestr = line.decode("utf-8")
- client_decoder.process_line(linestr)
- server_decoder.process_line(linestr)
+ for line in file:
+ handshake_decoder.process_line(line.decode("utf-8"))
try:
data = output_queue.get_nowait()
for key, value in data.items():
- if key == "message" and dump_messages:
- if thread_id:
- if "tid" in value.keys() and value["tid"] ==
thread_id:
- print(
- separator + json.dumps(value, indent=2,
default=str)
- )
- separator = ","
- else:
+ if key == "handshake":
+ print(separator + json.dumps(value, indent=2,
default=str))
+ separator = ","
+ except queue.Empty:
+ continue
+
+ separator = start_string
+ for line in file:
+ linestr = line.decode("utf-8")
+ client_decoder.process_line(linestr)
+ server_decoder.process_line(linestr)
+ try:
+ data = output_queue.get_nowait()
+ for key, value in data.items():
+ if key == "message" and dump_messages:
+ if thread_id:
+ if "tid" in value.keys() and value["tid"] == thread_id:
print(separator + json.dumps(value, indent=2,
default=str))
separator = ","
+ else:
+ print(separator + json.dumps(value, indent=2,
default=str))
+ separator = ","
+
+ except queue.Empty:
+ continue
+ except:
+ traceback.print_exc()
+ continue
- except queue.Empty:
- continue
- except:
- traceback.print_exc()
- continue
while True:
try:
data = output_queue.get_nowait()
@@ -100,9 +104,87 @@ def scan_file(filename, dump_handshake, dump_messages,
thread_id):
except queue.Empty:
break
+
+def scan_file(filename, dump_handshake, dump_messages, thread_id):
+ print("[")
+
+ output_queue = queue.Queue()
+
+ f = open(filename, "rb")
+ print("Scanning " + filename, file=sys.stderr)
+ scan_opened_file(
+ f,
+ HandshakeDecoder(output_queue),
+ ClientMessageDecoder(output_queue),
+ ServerMessageDecoder(output_queue),
+ output_queue,
+ dump_handshake,
+ dump_messages,
+ thread_id,
+ "",
+ )
+ f.close()
+
print("]")
+def scan_file_sequence(file, handshake, messages, thread_id):
+ dirname = os.path.dirname(file)
+ basename, ext = os.path.splitext(os.path.basename(file))
+
+ base_parts = basename.split("-")
+ if len(base_parts) == 2:
+ print("[")
+
+ root = base_parts[0]
+ roll_index = int(base_parts[1])
+
+ output_queue = queue.Queue()
+ handshake_decoder = HandshakeDecoder(output_queue)
+ client_decoder = ClientMessageDecoder(output_queue)
+ server_decoder = ServerMessageDecoder(output_queue)
+ start_string = ""
+ last_chance = False
+
+ while True:
+ if last_chance:
+ filename = dirname + os.sep + root + ext
+ else:
+ filename = dirname + os.sep + root + "-" + str(roll_index) +
ext
+
+ try:
+ f = open(filename, "rb")
+ print("Scanning " + filename, file=sys.stderr)
+ scan_opened_file(
+ f,
+ handshake_decoder,
+ client_decoder,
+ server_decoder,
+ output_queue,
+ handshake,
+ messages,
+ thread_id,
+ start_string,
+ )
+ start_string = ","
+ f.close()
+ roll_index += 1
+ if last_chance:
+ break
+ except FileNotFoundError:
+ last_chance = True
+ continue
+
+ print("]")
+
+ else:
+ raise ValueError(basename + " is not a valid rolled logfile name")
+
+
if __name__ == "__main__":
- (file, handshake, messages, thread_id) = command_line.parse_command_line()
- scan_file(file, handshake, messages, thread_id)
+ (file, handshake, messages, thread_id, rolled) =
command_line.parse_command_line()
+
+ if rolled:
+ scan_file_sequence(file, handshake, messages, thread_id)
+ else:
+ scan_file(file, handshake, messages, thread_id)
diff --git a/tools/gnmsg/server_message_decoder.py
b/tools/gnmsg/server_message_decoder.py
index 135f95b..ba03f40 100644
--- a/tools/gnmsg/server_message_decoder.py
+++ b/tools/gnmsg/server_message_decoder.py
@@ -393,9 +393,11 @@ class ServerMessageDecoder(DecoderBase):
if tid in self.chunk_decoders_.keys():
self.chunk_decoders_[tid].add_chunk(parts[3])
if self.chunk_decoders_[tid].is_complete_message():
- self.output_queue_.put(
- {"message":
self.chunk_decoders_[tid].get_decoded_message(parts[0])}
+ receive_trace =
self.chunk_decoders_[tid].get_decoded_message(
+ parts[0]
)
+ receive_trace["tid"] = str(tid)
+ self.output_queue_.put({"message": receive_trace})
self.chunk_decoders_[tid].reset()
else:
return
@@ -418,9 +420,7 @@ class ServerMessageDecoder(DecoderBase):
self.headers_[tid] = last_header
self.connection_states_[tid] =
self.STATE_WAITING_FOR_MESSAGE_BODY_
- elif (
- self.connection_states_[tid] ==
self.STATE_WAITING_FOR_MESSAGE_BODY_
- ):
+ elif self.connection_states_[tid] ==
self.STATE_WAITING_FOR_MESSAGE_BODY_:
if message_body:
receive_trace = self.headers_[tid]
self.headers_[tid] = None