http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/README.txt ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/README.txt b/zookeeper-contrib/zookeeper-contrib-loggraph/README.txt new file mode 100644 index 0000000..8ccaa1c --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/README.txt @@ -0,0 +1,70 @@ +LogGraph README + +1 - About +LogGraph is an application for viewing and filtering zookeeper logs. It can handle transaction logs and message logs. + +2 - Compiling + +Run "ant jar" in src/contrib/loggraph/. This will download all dependencies and compile all the loggraph code. + +Once compilation has finished, you can run it the the loggraph.sh script in zookeeper-contrib/zookeeper-contrib-loggraph/src/main/resources. +This will start and embedded web server on your machine. +Navigate to http://localhost:8182/graph/main.html + +3 - Usage +LogGraph presents the user with 4 views, + + a) Simple log view + This view simply displays the log text. This isn't very useful without filters (see "Filtering the logs"). + + b) Server view + The server view shows the interactions between the different servers in an ensemble. The X axis represents time. + * Exceptions show up as red dots. Hovering your mouse over them will give you more details of the exception + * The colour of the line represents the election state of the server. + - orange means LOOKING for leader + - dark green means the server is the leader + - light green means the server is following a leader + - yellow means there isn't enough information to determine the state of the server. + * The gray arrows denote election messages between servers. Pink dashed arrows are messages that were sent but never delivered. + + c) Session view + The session view shows the lifetime of sessions on a server. Use the time filter to narrow down the view. Any more than about 2000 events will take a long time to view in your browser. + The X axis represents time. Each line is a session. The black dots represent events on the session. You can click on the black dots for more details of the event. + + d) Stats view + There is currently only one statistics view, Transactions/minute. Suggestions for other statistic views are very welcome. + +4 - Filtering the logs +The logs can be filtered in 2 ways, by time and by content. + +To filter by time simply move the slider to the desired start time. The time window specifies how many milliseconds after and including the start time will be displayed. + +Content filtering uses a adhoc filtering language, using prefix notation. The language looks somewhat similar to lisp. A statement in the language takes the form (op arg arg ....). A statement resolves to a boolean value. Statements can be nested. + +4.1 - Filter arguments +An argument can be a number, a string or a symbol. A number is any argument which starts with -, + or 0 to 9. If the number starts with 0x it is interpretted as hexidecimal. Otherwise it is interpretted as decimal. If the argument begins with a double-quote, (") it is interpretted as a string. Anything else is interpretted as a symbol. + +4.2 - Filter symbols +The possible filter symbols are: + +client-id : number, the session id of the client who initiated a transaction. +cxid : number, the cxid of a transaction +zxid : number, the zxid of a transaction +operation : string, the operation being performed, for example "setData", "createSession", "closeSession", "error", "create" + +4.3 - Filter operations +The possible filter operations are: + +or : logical or, takes 1 or more arguments which must be other statements. +and : logical and, takes 1 or more arguments which must be other statements. +not : logical not, takes 1 argument which must be another statement. +xor : exclusive or, takes 1 or more arguments which must be other statements. += : equals, takes 1 or more arguments, which must all be equal to each other to return true. +> : greater than, takes 1 or more arguments, to return true the 1st argument must be greater than the 2nd argument which must be greater than the 3rd argument and so on... +< : less than, takes 1 or more arguments, to return true the 1st argument must be less than the 2nd argument which must be less than the 3rd argument and so on... + +4.3 - Filter examples +Give me all the setData operations with session id 0xdeadbeef or 0xcafeb33r but not with zxid 0x12341234 -> + +(and (= operation "setData") (or (= client-id 0xdeadbeef) (= client-id 0xcafeb33r)) (not (= zxid 0x12341234))) +
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/build.xml ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/build.xml b/zookeeper-contrib/zookeeper-contrib-loggraph/build.xml new file mode 100644 index 0000000..11143e7 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/build.xml @@ -0,0 +1,70 @@ +<?xml version="1.0"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project name="loggraph" default="jar"> + + <import file="../build-contrib.xml"/> + + <target name="init" depends="check-contrib,zookeeperbuildcontrib.init" unless="skip.contrib"> + <echo message="contrib: ${name}"/> + <mkdir dir="${build.dir}"/> + <antcall target="init-contrib"/> + </target> + + <target name="compile" depends="init,ivy-retrieve,zookeeperbuildcontrib.compile" unless="skip.contrib"> + </target> + + <target name="setjarname"> + <property name="jarname" value="${build.dir}/zookeeper-${version}-${name}.jar"/> + </target> + + <target name="jar" depends="setjarname,compile" > + <jar destfile="${jarname}"> + <fileset file="${zk.root}/LICENSE.txt" /> + <fileset dir="${build.classes}" /> + <fileset dir="src/main/webapp"/> + <manifest> + <attribute name="Built-By" value="${user.name}"/> + <attribute name="Built-At" value="${build.time}"/> + <attribute name="Built-On" value="${host.name}" /> + <attribute name="Implementation-Title" value="org.apache.zookeeper.graph"/> + <attribute name="Implementation-Version" value="${revision}"/> + <attribute name="Implementation-Vendor" value="The Apache Software Foundation"/> + </manifest> + </jar> + </target> + + <target name="test"> + <echo message="No test target defined for this package" /> + </target> + + + <target name="package" depends="compile, zookeeperbuildcontrib.package" unless="skip.contrib"> + <echo message="contrib: ${name}"/> + + <copy file="${basedir}/build.xml" todir="${dist.dir}/contrib/${name}"/> + + <mkdir dir="${dist.dir}/contrib/${name}/src"/> + <copy todir="${dist.dir}/contrib/${name}/src"> + <fileset dir="${basedir}/src/main"/> + </copy> + + </target> + +</project> http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/ivy.xml ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/ivy.xml b/zookeeper-contrib/zookeeper-contrib-loggraph/ivy.xml new file mode 100644 index 0000000..e3a1b48 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/ivy.xml @@ -0,0 +1,44 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<ivy-module version="2.0" + xmlns:e="http://ant.apache.org/ivy/extra"> + + <info organisation="org.apache.zookeeper" + module="${name}" revision="${version}"> + <license name="Apache 2.0"/> + <ivyauthor name="Apache ZooKeeper" url="http://zookeeper.apache.org"/> + <description>ZooKeeper Graphing</description> + </info> + + <configurations defaultconfmapping="default"> + <conf name="default"/> + <conf name="test"/> + </configurations> + + <dependencies> + <dependency org="org.slf4j" name="slf4j-api" rev="1.7.5"/> + <dependency org="org.slf4j" name="slf4j-log4j12" rev="1.7.5" transitive="false"/> + + <!-- transitive false turns off dependency checking, log4j deps seem borked --> + <dependency org="log4j" name="log4j" rev="1.2.17" transitive="false"/> + <dependency org="org.eclipse.jetty" name="jetty-server" rev="9.2.18.v20160721" /> + <dependency org="org.eclipse.jetty" name="jetty-servlet" rev="9.2.18.v20160721" /> + <dependency org="com.googlecode.json-simple" name="json-simple" rev="1.1" /> + </dependencies> + +</ivy-module> http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/FilterException.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/FilterException.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/FilterException.java new file mode 100644 index 0000000..c0912fa --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/FilterException.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +public class FilterException extends Exception { + public FilterException(String s) { super(s); } +}; http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/FilterOp.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/FilterOp.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/FilterOp.java new file mode 100644 index 0000000..ee73283 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/FilterOp.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +import java.util.ArrayList; +import org.apache.zookeeper.graph.filterops.*; + +public abstract class FilterOp { + protected ArrayList<FilterOp> subOps; + protected ArrayList<Arg> args; + + public enum ArgType { + STRING, NUMBER, SYMBOL + } + + public FilterOp() { + subOps = new ArrayList<FilterOp>(); + args = new ArrayList<Arg>(); + } + + public static FilterOp newOp(String op) throws FilterException { + if (op.equals("or")) + return new OrOp(); + if (op.equals("and")) + return new AndOp(); + if (op.equals("not")) + return new NotOp(); + if (op.equals("xor")) + return new XorOp(); + if (op.equals("=")) + return new EqualsOp(); + if (op.equals("<")) + return new LessThanOp(); + if (op.equals(">")) + return new GreaterThanOp(); + + throw new FilterException("Invalid operation '"+op+"'"); + } + + public void addSubOp(FilterOp op) { + subOps.add(op); + } + + public void addArg(Arg arg) { + args.add(arg); + } + + public abstract boolean matches(LogEntry entry) throws FilterException; + + public String toString() { + String op = "(" + getClass().getName(); + for (FilterOp f : subOps) { + op += " " + f; + } + for (Arg a : args) { + op += " " + a; + } + return op + ")"; + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/FilterParser.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/FilterParser.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/FilterParser.java new file mode 100644 index 0000000..cf12e3a --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/FilterParser.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +import java.io.PushbackReader; +import java.io.StringReader; +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.zookeeper.graph.filterops.*; + +public class FilterParser { + private PushbackReader reader; + + public FilterParser(String s) { + reader = new PushbackReader(new StringReader(s)); + } + + private String readUntilSpace() throws IOException { + StringBuffer buffer = new StringBuffer(); + + int c = reader.read(); + while (!Character.isWhitespace(c) && c != ')' && c != '(') { + buffer.append((char)c); + c = reader.read(); + if (c == -1) { + break; + } + } + reader.unread(c); + + return buffer.toString().trim(); + } + + private StringArg readStringArg() throws IOException, FilterException { + int c = reader.read(); + int last = 0; + if (c != '"') { + throw new FilterException("Check the parser, trying to read a string that doesn't begin with quotes"); + } + StringBuffer buffer = new StringBuffer(); + while (reader.ready()) { + last = c; + c = reader.read(); + if (c == -1) { + break; + } + + if (c == '"' && last != '\\') { + return new StringArg(buffer.toString()); + } else { + buffer.append((char)c); + } + } + throw new FilterException("Unterminated string"); + } + + private NumberArg readNumberArg() throws IOException, FilterException { + String strval = readUntilSpace(); + + try { + if (strval.startsWith("0x")) { + return new NumberArg(Long.valueOf(strval.substring(2), 16)); + } else { + return new NumberArg(Long.valueOf(strval)); + } + } catch (NumberFormatException e) { + throw new FilterException("Not a number [" + strval + "]\n" + e); + } + } + + private SymbolArg readSymbolArg() throws IOException, FilterException { + return new SymbolArg(readUntilSpace()); + } + + public FilterOp parse() throws IOException, FilterException { + int c = reader.read(); + if (c != '(') { + throw new FilterException("Invalid format"); + } + + String opstr = readUntilSpace(); + FilterOp op = FilterOp.newOp(opstr); + + while (reader.ready()) { + c = reader.read(); + if (c == -1) { + break; + } + if (c == '(') { + reader.unread(c); + op.addSubOp(parse()); + } else if (c == ')') { + return op; + } else if (c == '"') { + reader.unread(c); + op.addArg(readStringArg()); + } else if (Character.isDigit(c) || c == '-' || c == '+') { + reader.unread(c); + op.addArg(readNumberArg()); + } else if (Character.isJavaIdentifierStart(c)) { + reader.unread(c); + op.addArg(readSymbolArg()); + } + } + throw new FilterException("Incomplete filter"); + } + + public static void main(String[] args) throws IOException, FilterException { + if (args.length == 1) { + System.out.println(new FilterParser(args[0]).parse()); + } else { + System.out.println(new FilterParser("(or (and (= session foobar) (= session barfoo)) (= session sdfs))").parse()); + } + } +}; http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/JsonGenerator.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/JsonGenerator.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/JsonGenerator.java new file mode 100644 index 0000000..afaf3a1 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/JsonGenerator.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; + +import java.io.Writer; +import java.io.OutputStreamWriter; +import java.io.IOException; +import java.util.regex.Pattern; +import java.util.regex.Matcher; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.ListIterator; + +public class JsonGenerator { + private JSONObject root; + private HashSet<Integer> servers; + + private class Message { + private int from; + private int to; + private long zxid; + + public Message(int from, int to, long zxid) { + this.from = from; + this.to = to; + this.zxid = zxid; + } + + public boolean equals(Message m) { + return (m.from == this.from + && m.to == this.to + && m.zxid == this.zxid); + } + }; + + public JSONObject txnEntry(TransactionEntry e) { + JSONObject event = new JSONObject(); + + event.put("time", Long.toString(e.getTimestamp())); + event.put("client", Long.toHexString(e.getClientId())); + event.put("cxid", Long.toHexString(e.getCxid())); + event.put("zxid", Long.toHexString(e.getZxid())); + event.put("op", e.getOp()); + event.put("extra", e.getExtra()); + event.put("type", "transaction"); + + return event; + } + + /** + Assumes entries are sorted by timestamp. + */ + public JsonGenerator(LogIterator iter) { + servers = new HashSet<Integer>(); + + Pattern stateChangeP = Pattern.compile("- (LOOKING|FOLLOWING|LEADING)"); + Pattern newElectionP = Pattern.compile("New election. My id = (\\d+), Proposed zxid = (\\d+)"); + Pattern receivedProposalP = Pattern.compile("Notification: (\\d+) \\(n.leader\\), (\\d+) \\(n.zxid\\), (\\d+) \\(n.round\\), .+ \\(n.state\\), (\\d+) \\(n.sid\\), .+ \\(my state\\)"); + Pattern exceptionP = Pattern.compile("xception"); + + root = new JSONObject(); + Matcher m = null; + JSONArray events = new JSONArray(); + root.put("events", events); + + long starttime = Long.MAX_VALUE; + long endtime = 0; + + int leader = 0; + long curEpoch = 0; + boolean newEpoch = false; + + while (iter.hasNext()) { + LogEntry ent = iter.next(); + + if (ent.getTimestamp() < starttime) { + starttime = ent.getTimestamp(); + } + if (ent.getTimestamp() > endtime) { + endtime = ent.getTimestamp(); + } + + if (ent.getType() == LogEntry.Type.TXN) { + events.add(txnEntry((TransactionEntry)ent)); + } else { + Log4JEntry e = (Log4JEntry)ent; + servers.add(e.getNode()); + + if ((m = stateChangeP.matcher(e.getEntry())).find()) { + JSONObject stateChange = new JSONObject(); + stateChange.put("type", "stateChange"); + stateChange.put("time", e.getTimestamp()); + stateChange.put("server", e.getNode()); + stateChange.put("state", m.group(1)); + events.add(stateChange); + + if (m.group(1).equals("LEADING")) { + leader = e.getNode(); + } + } else if ((m = newElectionP.matcher(e.getEntry())).find()) { + Iterator<Integer> iterator = servers.iterator(); + long zxid = Long.valueOf(m.group(2)); + int count = (int)zxid;// & 0xFFFFFFFFL; + int epoch = (int)Long.rotateRight(zxid, 32);// >> 32; + + if (leader != 0 && epoch > curEpoch) { + JSONObject stateChange = new JSONObject(); + stateChange.put("type", "stateChange"); + stateChange.put("time", e.getTimestamp()); + stateChange.put("server", leader); + stateChange.put("state", "INIT"); + events.add(stateChange); + leader = 0; + } + + while (iterator.hasNext()) { + int dst = iterator.next(); + if (dst != e.getNode()) { + JSONObject msg = new JSONObject(); + msg.put("type", "postmessage"); + msg.put("src", e.getNode()); + msg.put("dst", dst); + msg.put("time", e.getTimestamp()); + msg.put("zxid", m.group(2)); + msg.put("count", count); + msg.put("epoch", epoch); + + events.add(msg); + } + } + } else if ((m = receivedProposalP.matcher(e.getEntry())).find()) { + // Pattern.compile("Notification: \\d+, (\\d+), (\\d+), \\d+, [^,]*, [^,]*, (\\d+)");//, LOOKING, LOOKING, 2 + int src = Integer.valueOf(m.group(4)); + long zxid = Long.valueOf(m.group(2)); + int dst = e.getNode(); + long epoch2 = Long.valueOf(m.group(3)); + + int count = (int)zxid;// & 0xFFFFFFFFL; + int epoch = (int)Long.rotateRight(zxid, 32);// >> 32; + + if (leader != 0 && epoch > curEpoch) { + JSONObject stateChange = new JSONObject(); + stateChange.put("type", "stateChange"); + stateChange.put("time", e.getTimestamp()); + stateChange.put("server", leader); + stateChange.put("state", "INIT"); + events.add(stateChange); + leader = 0; + } + + if (src != dst) { + JSONObject msg = new JSONObject(); + msg.put("type", "delivermessage"); + msg.put("src", src); + msg.put("dst", dst); + msg.put("time", e.getTimestamp()); + msg.put("zxid", zxid); + msg.put("epoch", epoch); + msg.put("count", count); + msg.put("epoch2", epoch2); + + events.add(msg); + } + } else if ((m = exceptionP.matcher(e.getEntry())).find()) { + JSONObject ex = new JSONObject(); + ex.put("type", "exception"); + ex.put("server", e.getNode()); + ex.put("time", e.getTimestamp()); + ex.put("text", e.getEntry()); + events.add(ex); + } + } + JSONObject ex = new JSONObject(); + ex.put("type", "text"); + ex.put("time", ent.getTimestamp()); + String txt = ent.toString(); + ex.put("text", txt); + events.add(ex); + } + // System.out.println("pending messages: "+pendingMessages.size()); + root.put("starttime", starttime); + root.put("endtime", endtime); + + JSONArray serversarray = new JSONArray(); + root.put("servers", serversarray); + + Iterator<Integer> iterator = servers.iterator(); + while (iterator.hasNext()) { + serversarray.add(iterator.next()); + } + } + + public String toString() { + return JSONValue.toJSONString(root); + } + + public static void main(String[] args) throws Exception { + MergedLogSource src = new MergedLogSource(args); + LogIterator iter = src.iterator(); + System.out.println(new JsonGenerator(iter)); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/Log4JEntry.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/Log4JEntry.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/Log4JEntry.java new file mode 100644 index 0000000..0edc146 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/Log4JEntry.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +public class Log4JEntry extends LogEntry { + public Log4JEntry(long timestamp, int node, String entry) { + super(timestamp); + setAttribute("log-text", entry); + setAttribute("node", new Integer(node)); + } + + public String getEntry() { + return (String) getAttribute("log-text"); + } + + public String toString() { + return "" + getTimestamp() + "::::" + getNode() + "::::" + getEntry(); + } + + public int getNode() { + return (Integer) getAttribute("node"); + } + + public Type getType() { return LogEntry.Type.LOG4J; } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/Log4JSource.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/Log4JSource.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/Log4JSource.java new file mode 100644 index 0000000..84a9d98 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/Log4JSource.java @@ -0,0 +1,391 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +import java.io.File; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.regex.Pattern; +import java.util.regex.Matcher; +import java.util.ArrayList; +import java.util.Date; +import java.text.SimpleDateFormat; +import java.text.ParseException; +import java.util.Calendar; +import java.util.GregorianCalendar; + +import java.io.EOFException; +import java.io.Closeable; +import java.io.FileNotFoundException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Log4JSource implements LogSource { + private static final Logger LOG = LoggerFactory.getLogger(Log4JSource.class); + + private static final int skipN = 10000; + private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS"; + + private LogSkipList skiplist = null; + + private String file = null; + private long starttime = 0; + private long endtime = 0; + private int serverid = 0; + private long size = 0; + + private Pattern timep; + + public boolean overlapsRange(long starttime, long endtime) { + return (starttime <= this.endtime && endtime >= this.starttime); + } + + public long size() { return size; } + public long getStartTime() { return starttime; } + public long getEndTime() { return endtime; } + public LogSkipList getSkipList() { return skiplist; } + + private class Log4JSourceIterator implements LogIterator { + private RandomAccessFileReader in; + private LogEntry next = null; + private long starttime = 0; + private long endtime = 0; + private String buf = ""; + private Log4JSource src = null; + private long skippedAtStart = 0; + private SimpleDateFormat dateformat = null; + private FilterOp filter = null; + + public Log4JSourceIterator(Log4JSource src, long starttime, long endtime) throws IllegalArgumentException, FilterException { + this(src, starttime, endtime, null); + } + + public Log4JSourceIterator(Log4JSource src, long starttime, long endtime, FilterOp filter) throws IllegalArgumentException, FilterException { + + this.dateformat = new SimpleDateFormat(DATE_FORMAT); + this.src = src; + this.starttime = starttime; + this.endtime = endtime; + + File f = new File(src.file); + try { + in = new RandomAccessFileReader(f); + } catch (FileNotFoundException e) { + throw new IllegalArgumentException("Bad file passed in (" + src.file +") cannot open:" + e); + } + + // skip to the offset of latest skip point before starttime + LogSkipList.Mark start = src.getSkipList().findMarkBefore(starttime); + try { + in.seek(start.getBytes()); + skippedAtStart = start.getEntriesSkipped(); + } catch (IOException ioe) { + // if we can't skip, we should just read from the start + } + + LogEntry e; + while ((e = readNextEntry()) != null && e.getTimestamp() < endtime) { + if (e.getTimestamp() >= starttime && (filter == null || filter.matches(e))) { + next = e; + return; + } + skippedAtStart++; + } + this.filter = filter; + } + + synchronized public long size() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("size() called"); + } + + if (this.endtime >= src.getEndTime()) { + return src.size() - skippedAtStart; + } + + long pos = in.getPosition(); + + if (LOG.isTraceEnabled()) { + LOG.trace("saved pos () = " + pos); + } + + LogEntry e; + + LogSkipList.Mark lastseg = src.getSkipList().findMarkBefore(this.endtime); + in.seek(lastseg.getBytes()); + buf = ""; // clear the buf so we don't get something we read before we sought + // number of entries skipped to get to the end of the iterator, less the number skipped to get to the start + long count = lastseg.getEntriesSkipped() - skippedAtStart; + + while ((e = readNextEntry()) != null) { + if (LOG.isTraceEnabled()) { + //LOG.trace(e); + } + if (e.getTimestamp() > this.endtime) { + break; + } + count++; + } + in.seek(pos); + buf = ""; + + if (LOG.isTraceEnabled()) { + LOG.trace("size() = " + count); + } + + return count; + } + + synchronized private LogEntry readNextEntry() { + try { + try { + while (true) { + String line = in.readLine(); + if (line == null) { + break; + } + + Matcher m = src.timep.matcher(line); + if (m.lookingAt()) { + if (buf.length() > 0) { + LogEntry e = new Log4JEntry(src.timestampFromText(dateformat, buf), src.getServerId(), buf); + buf = line; + return e; + } + buf = line; + } else if (buf.length() > 0) { + buf += line + "\n"; + } + } + } catch (EOFException eof) { + // ignore, we've simply come to the end of the file + } + if (buf.length() > 0) { + LogEntry e = new Log4JEntry(src.timestampFromText(dateformat, buf), src.getServerId(), buf); + buf = ""; + return e; + } + } catch (Exception e) { + LOG.error("Error reading next entry in file (" + src.file + "): " + e); + return null; + } + return null; + } + + public boolean hasNext() { + return next != null; + } + + public LogEntry next() throws NoSuchElementException { + LogEntry ret = next; + LogEntry e = readNextEntry(); + + if (filter != null) { + try { + while (e != null && !filter.matches(e)) { + e = readNextEntry(); + } + } catch (FilterException fe) { + throw new NoSuchElementException(e.toString()); + } + } + + if (e != null && e.getTimestamp() < endtime) { + next = e; + } else { + next = null; + } + return ret; + } + + public void remove() throws UnsupportedOperationException { + throw new UnsupportedOperationException("remove not supported for L4J logs"); + } + + public void close() throws IOException { + in.close(); + } + + public String toString() { + String size; + try { + size = new Long(size()).toString(); + } catch (IOException ioe) { + size = "Unable to read"; + } + return "Log4JSourceIterator(start=" + starttime + ", end=" + endtime + ", size=" + size + ")"; + } + } + + public LogIterator iterator(long starttime, long endtime) throws IllegalArgumentException { + try { + return iterator(starttime, endtime, null); + } catch (FilterException fe) { + assert(false); //"This should never happen, you can't have a filter exception without a filter"); + return null; + } + } + + public LogIterator iterator(long starttime, long endtime, FilterOp filter) throws IllegalArgumentException, FilterException{ + // sanitise start and end times + if (endtime < starttime) { + throw new IllegalArgumentException("End time (" + endtime + ") must be greater or equal to starttime (" + starttime + ")"); + } + + return new Log4JSourceIterator(this, starttime, endtime, filter); + } + + public LogIterator iterator() throws IllegalArgumentException { + return iterator(starttime, endtime+1); + } + + public Log4JSource(String file) throws IOException { + this.file=file; + + timep = Pattern.compile("^(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3})"); + skiplist = new LogSkipList(); + init(); + } + + private static long timestampFromText(SimpleDateFormat format, String s) { + Date d = null; + try { + d = format.parse(s); + } catch (ParseException e) { + return 0; + } + Calendar c = new GregorianCalendar(); + c.setTime(d); + return c.getTimeInMillis(); + } + + private void init() throws IOException { + File f = new File(file); + RandomAccessFileReader in = new RandomAccessFileReader(f); + SimpleDateFormat dateformat = new SimpleDateFormat(DATE_FORMAT); + Pattern idp = Pattern.compile("\\[myid:(\\d+)\\]"); + + long lastFp = in.getPosition(); + String line = in.readLine(); + Matcher m = null; + + // if we have read data from the file, and it matchs the timep pattern + if ((line != null) && (m = timep.matcher(line)).lookingAt()) { + starttime = timestampFromText(dateformat, m.group(1)); + } else { + throw new IOException("Invalid log4j format. First line doesn't start with time"); + } + + /* + Count number of log entries. Any line starting with a timestamp counts as an entry + */ + String lastentry = line; + try { + while (line != null) { + m = timep.matcher(line); + if (m.lookingAt()) { + if (size % skipN == 0) { + long time = timestampFromText(dateformat, m.group(1)); + skiplist.addMark(time, lastFp, size); + } + size++; + lastentry = line; + } + if (serverid == 0 && (m = idp.matcher(line)).find()) { + serverid = Integer.valueOf(m.group(1)); + } + + lastFp = in.getPosition(); + line = in.readLine(); + } + } catch (EOFException eof) { + // ignore, simply end of file, though really (line!=null) should have caught this + } finally { + in.close(); + } + + m = timep.matcher(lastentry); + if (m.lookingAt()) { + endtime = timestampFromText(dateformat, m.group(1)); + } else { + throw new IOException("Invalid log4j format. Last line doesn't start with time"); + } + } + + public String toString() { + return "Log4JSource(file=" + file + ", size=" + size + ", start=" + starttime + ", end=" + endtime +", id=" + serverid +")"; + } + + public static void main(String[] args) throws IOException { + final Log4JSource s = new Log4JSource(args[0]); + System.out.println(s); + + LogIterator iter; + + if (args.length == 3) { + final long starttime = Long.valueOf(args[1]); + final long endtime = Long.valueOf(args[2]); + iter = s.iterator(starttime, endtime); + + Thread t1 = new Thread() { public void run () { + + LogIterator iter = s.iterator(starttime, endtime); + System.out.println(iter); + try { + iter.close(); + } catch (IOException ioe) { + System.out.println(ioe.getMessage()); + } + }; }; + Thread t2 = new Thread() { public void run () { + + LogIterator iter = s.iterator(starttime, endtime); + System.out.println(iter); + try { + iter.close(); + } catch (IOException ioe) { + System.out.println(ioe.getMessage()); + } + }; }; + Thread t3 = new Thread() { public void run () { + + LogIterator iter = s.iterator(starttime, endtime); + System.out.println(iter); + }; }; + t1.start(); + t2.start(); + // t3.start(); + } else { + iter = s.iterator(); + } + + /*while (iter.hasNext()) { + System.out.println(iter.next()); + }*/ + iter.close(); + } + + public int getServerId() { + return serverid; + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogEntry.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogEntry.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogEntry.java new file mode 100644 index 0000000..a8252eb --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogEntry.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +import java.io.Serializable; +import java.util.HashMap; + +public abstract class LogEntry implements Serializable { + private HashMap attributes; + + public enum Type { UNKNOWN, LOG4J, TXN }; + + public LogEntry(long timestamp) { + attributes = new HashMap(); + setAttribute("timestamp", new Long(timestamp)); + } + + public long getTimestamp() { + return (Long)getAttribute("timestamp"); + } + + public abstract Type getType(); + + public void setAttribute(String key, Object v) { + attributes.put(key, v); + } + + public Object getAttribute(String key) { + return attributes.get(key); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogIterator.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogIterator.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogIterator.java new file mode 100644 index 0000000..9af440b --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogIterator.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +import java.io.Closeable; +import java.util.Iterator; +import java.io.IOException; + +public interface LogIterator extends Iterator<LogEntry>, Closeable { + long size() throws IOException;; +}; http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogServer.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogServer.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogServer.java new file mode 100644 index 0000000..5cffcdd --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogServer.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +import java.io.IOException; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.ServletException; + +import java.io.IOException; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; + +import org.apache.zookeeper.graph.servlets.*; + +public class LogServer extends ServletContextHandler { + public LogServer(MergedLogSource src) throws Exception { + super(ServletContextHandler.SESSIONS); + setContextPath("/"); + + addServlet(new ServletHolder(new StaticContent()),"/graph/*"); + + addServlet(new ServletHolder(new Fs()),"/fs"); + addServlet(new ServletHolder(new GraphData(src)), "/data"); + addServlet(new ServletHolder(new FileLoader(src)), "/loadfile"); + addServlet(new ServletHolder(new NumEvents(src)), "/info"); + addServlet(new ServletHolder(new Throughput(src)), "/throughput"); + } + + public static void main(String[] args) { + try { + MergedLogSource src = new MergedLogSource(args); + System.out.println(src); + + Server server = new Server(8182); + server.setHandler(new LogServer(src)); + + server.start(); + server.join(); + + } catch (Exception e) { + // Something is wrong. + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogSkipList.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogSkipList.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogSkipList.java new file mode 100644 index 0000000..e744442 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogSkipList.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +import java.util.List; +import java.util.LinkedList; +import java.util.NoSuchElementException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** +Generic skip list for holding a rough index of a log file. When the log file is loaded, this +index is built by adding a mark every n entries. Then when a specific time position is requested +from the file, a point at most n-1 entries before the time position can be jumped to. + +*/ +public class LogSkipList { + private static final Logger LOG = LoggerFactory.getLogger(LogSkipList.class); + + private LinkedList<Mark> marks; + + public class Mark { + private long time; + private long bytes; + private long skipped; + + public Mark(long time, long bytes, long skipped) { + this.time = time; + this.bytes = bytes; + this.skipped = skipped; + } + + public long getTime() { return this.time; } + public long getBytes() { return this.bytes; } + public long getEntriesSkipped() { return this.skipped; } + + public String toString() { + return "Mark(time=" + time + ", bytes=" + bytes + ", skipped=" + skipped + ")"; + } + }; + + public LogSkipList() { + if (LOG.isTraceEnabled()) { + LOG.trace("New skip list"); + } + marks = new LinkedList<Mark>(); + } + + public void addMark(long time, long bytes, long skipped) { + if (LOG.isTraceEnabled()) { + LOG.trace("addMark (time:" + time + ", bytes: " + bytes + ", skipped: " + skipped + ")"); + } + marks.add(new Mark(time, bytes, skipped)); + } + + /** + Find the last mark in the skip list before time. + */ + public Mark findMarkBefore(long time) throws NoSuchElementException { + if (LOG.isTraceEnabled()) { + LOG.trace("findMarkBefore(" + time + ")"); + } + + Mark last = marks.getFirst(); + for (Mark m: marks) { + if (m.getTime() > time) { + break; + } + last = m; + } + + if (LOG.isTraceEnabled()) { + LOG.trace("return " + last ); + } + + return last; + } + +}; http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogSource.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogSource.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogSource.java new file mode 100644 index 0000000..9845c7f --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/LogSource.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; +import java.util.Iterator; + +public interface LogSource extends Iterable<LogEntry> { + public LogIterator iterator(long starttime, long endtime, FilterOp filter) throws IllegalArgumentException, FilterException; + + public LogIterator iterator(long starttime, long endtime) throws IllegalArgumentException; + + public LogIterator iterator() throws IllegalArgumentException; + + public boolean overlapsRange(long starttime, long endtime); + + public long size(); + public long getStartTime(); + public long getEndTime(); +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/MeasureThroughput.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/MeasureThroughput.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/MeasureThroughput.java new file mode 100644 index 0000000..1c83da7 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/MeasureThroughput.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +import java.io.IOException; +import java.io.BufferedOutputStream; +import java.io.FileOutputStream; +import java.io.DataOutputStream; +import java.io.PrintStream; + +import java.util.HashSet; + +public class MeasureThroughput { + private static final int MS_PER_SEC = 1000; + private static final int MS_PER_MIN = MS_PER_SEC*60; + private static final int MS_PER_HOUR = MS_PER_MIN*60; + + public static void main(String[] args) throws IOException { + MergedLogSource source = new MergedLogSource(args); + + PrintStream ps_ms = new PrintStream(new BufferedOutputStream(new FileOutputStream("throughput-ms.out"))); + PrintStream ps_sec = new PrintStream(new BufferedOutputStream(new FileOutputStream("throughput-sec.out"))); + PrintStream ps_min = new PrintStream(new BufferedOutputStream(new FileOutputStream("throughput-min.out"))); + PrintStream ps_hour = new PrintStream(new BufferedOutputStream(new FileOutputStream("throughput-hour.out"))); + LogIterator iter; + + System.out.println(source); + iter = source.iterator(); + long currentms = 0; + long currentsec = 0; + long currentmin = 0; + long currenthour = 0; + HashSet<Long> zxids_ms = new HashSet<Long>(); + long zxid_sec = 0; + long zxid_min = 0; + long zxid_hour = 0; + + while (iter.hasNext()) { + LogEntry e = iter.next(); + TransactionEntry cxn = (TransactionEntry)e; + + long ms = cxn.getTimestamp(); + long sec = ms/MS_PER_SEC; + long min = ms/MS_PER_MIN; + long hour = ms/MS_PER_HOUR; + + if (currentms != ms && currentms != 0) { + ps_ms.println("" + currentms + " " + zxids_ms.size()); + + zxid_sec += zxids_ms.size(); + zxid_min += zxids_ms.size(); + zxid_hour += zxids_ms.size(); + zxids_ms.clear(); + } + + if (currentsec != sec && currentsec != 0) { + ps_sec.println("" + currentsec*MS_PER_SEC + " " + zxid_sec); + + zxid_sec = 0; + } + + if (currentmin != min && currentmin != 0) { + ps_min.println("" + currentmin*MS_PER_MIN + " " + zxid_min); + + zxid_min = 0; + } + + if (currenthour != hour && currenthour != 0) { + ps_hour.println("" + currenthour*MS_PER_HOUR + " " + zxid_hour); + + zxid_hour = 0; + } + + currentms = ms; + currentsec = sec; + currentmin = min; + currenthour = hour; + + zxids_ms.add(cxn.getZxid()); + } + + iter.close(); + ps_ms.close(); + ps_sec.close(); + ps_min.close(); + ps_hour.close(); + } +}; http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/MergedLogSource.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/MergedLogSource.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/MergedLogSource.java new file mode 100644 index 0000000..bb789d3 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/MergedLogSource.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.FileInputStream; +import java.io.IOException; +import java.text.DateFormat; +import java.util.Date; +import java.util.zip.Adler32; +import java.util.zip.Checksum; +import java.util.HashMap; + +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.InputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.server.TraceFormatter; +import org.apache.zookeeper.server.persistence.FileHeader; +import org.apache.zookeeper.server.persistence.FileTxnLog; +import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.txn.TxnHeader; + +import org.apache.zookeeper.ZooDefs.OpCode; + +import org.apache.zookeeper.txn.CreateSessionTxn; +import org.apache.zookeeper.txn.CreateTxn; +import org.apache.zookeeper.txn.DeleteTxn; +import org.apache.zookeeper.txn.ErrorTxn; +import org.apache.zookeeper.txn.SetACLTxn; +import org.apache.zookeeper.txn.SetDataTxn; +import org.apache.zookeeper.txn.TxnHeader; + +import java.io.Closeable; +import java.io.FileNotFoundException; +import java.util.Vector; +import java.util.Iterator; +import java.util.Collections; +import java.util.NoSuchElementException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergedLogSource implements LogSource { + private static final Logger LOG = LoggerFactory.getLogger(MergedLogSource.class); + private Vector<LogSource> sources = null; + private long starttime = 0; + private long endtime = 0; + private long size = 0; + + public boolean overlapsRange(long starttime, long endtime) { + return (starttime <= this.endtime && endtime >= this.starttime); + } + + public long size() { return size; } + public long getStartTime() { return starttime; } + public long getEndTime() { return endtime; } + + private class MergedLogSourceIterator implements LogIterator { + private LogEntry next = null; + private long start = 0; + private long end = 0; + private MergedLogSource src = null; + private LogIterator[] sources = null; + private LogEntry[] nexts = null; + private FilterOp filter = null; + + public MergedLogSourceIterator(MergedLogSource src, long starttime, long endtime, FilterOp filter) throws IllegalArgumentException, FilterException { + Vector<LogIterator> iters = new Vector<LogIterator>(); + for (LogSource s : src.sources) { + if (s.overlapsRange(starttime, endtime)) { + iters.add(s.iterator(starttime, endtime, filter)); + } + } + + sources = new LogIterator[iters.size()]; + sources = iters.toArray(sources); + nexts = new LogEntry[iters.size()]; + for (int i = 0; i < sources.length; i++) { + if (sources[i].hasNext()) + nexts[i] = sources[i].next(); + } + this.filter = filter; + } + + public MergedLogSourceIterator(MergedLogSource src, long starttime, long endtime) throws IllegalArgumentException, FilterException { + this(src, starttime, endtime, null); + } + + public long size() throws IOException { + long size = 0; + for (LogIterator i : sources) { + size += i.size(); + } + return size; + } + + public boolean hasNext() { + for (LogEntry n : nexts) { + if (n != null) return true; + } + return false; + } + + public LogEntry next() { + int min = -1; + for (int i = 0; i < nexts.length; i++) { + if (nexts[i] != null) { + if (min == -1) { + min = i; + } else if (nexts[i].getTimestamp() < nexts[min].getTimestamp()) { + min = i; + } + } + } + if (min == -1) { + return null; + } else { + LogEntry e = nexts[min]; + nexts[min] = sources[min].next(); + return e; + } + } + + public void remove() throws UnsupportedOperationException { + throw new UnsupportedOperationException("remove not supported for Merged logs"); + } + + public void close() throws IOException { + for (LogIterator i : sources) { + i.close(); + } + } + } + + public LogIterator iterator(long starttime, long endtime) throws IllegalArgumentException { + try { + return iterator(starttime, endtime, null); + } catch (FilterException fe) { + assert(false); // shouldn't happen without filter + return null; + } + } + + public LogIterator iterator(long starttime, long endtime, FilterOp filter) throws IllegalArgumentException, FilterException { + // sanitise start and end times + if (endtime < starttime) { + throw new IllegalArgumentException("End time (" + endtime + ") must be greater or equal to starttime (" + starttime + ")"); + } + + return new MergedLogSourceIterator(this, starttime, endtime, filter); + } + + public LogIterator iterator() throws IllegalArgumentException { + return iterator(starttime, endtime+1); + } + + public MergedLogSource(String[] files) throws IOException { + sources = new Vector<LogSource>(); + for (String f : files) { + addSource(f); + } + } + + public void addSource(String f) throws IOException { + LogSource s = null; + if (TxnLogSource.isTransactionFile(f)) { + s = new TxnLogSource(f); + } else { + s = new Log4JSource(f); + } + + size += s.size(); + endtime = s.getEndTime() > endtime ? s.getEndTime() : endtime; + starttime = s.getStartTime() < starttime || starttime == 0 ? s.getStartTime() : starttime; + sources.add(s); + } + + public String toString() { + String s = "MergedLogSource(size=" + size + ", start=" + starttime + ", end=" + endtime +")"; + for (LogSource src : sources) { + s += "\n\t- " +src; + } + return s; + } + + public static void main(String[] args) throws IOException { + System.out.println("Time: " + System.currentTimeMillis()); + MergedLogSource s = new MergedLogSource(args); + System.out.println(s); + + LogIterator iter; + + iter = s.iterator(); + System.out.println("Time: " + System.currentTimeMillis()); + System.out.println("Iterator Size: " + iter.size()); + System.out.println("Time: " + System.currentTimeMillis()); + /* while (iter.hasNext()) { + System.out.println(iter.next()); + }*/ + iter.close(); + System.out.println("Time: " + System.currentTimeMillis()); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/RandomAccessFileReader.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/RandomAccessFileReader.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/RandomAccessFileReader.java new file mode 100644 index 0000000..13a41a5 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/RandomAccessFileReader.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +import java.io.File; +import java.io.Reader; +import java.io.IOException; +import java.io.EOFException; +import java.io.RandomAccessFile; +import java.io.FileNotFoundException; + +import java.io.DataInputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInput; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RandomAccessFileReader extends Reader implements DataInput { + private static final Logger LOG = LoggerFactory.getLogger(RandomAccessFileReader.class); + private RandomAccessFile file; + private byte[] buffer; + private int buffersize; + private int bufferoffset; + private long fileoffset; + private long fp; + + private static final int DEFAULT_BUFFER_SIZE = 512*1024; // 512k + private int point = 0; + + public RandomAccessFileReader(File f) throws FileNotFoundException { + file = new RandomAccessFile(f, "r"); + if (LOG.isDebugEnabled()) { + try { + LOG.debug("Opened file(" + f + ") with FD (" + file.getFD() + ")"); + } catch (IOException ioe) { + LOG.debug("Opened file(" + f + ") coulds get FD"); + } + } + + buffer = new byte[DEFAULT_BUFFER_SIZE]; + buffersize = 0; + bufferoffset = 0; + fileoffset = 0; + fp = 0; + } + + /** + fill the buffer from the file. + fp keeps track of the file pointer. + fileoffset is the offset into the file to where the buffer came from. + */ + private int fill() throws IOException { + fileoffset = fp; + int read = file.read(buffer, 0, buffer.length); + + if (LOG.isDebugEnabled()) { + String buf = new String(buffer, 0, 40, "UTF-8"); + LOG.debug("fill(buffer=" + buf + ")"); + } + + if (read == -1) { // eof reached + buffersize = 0; + } else { + buffersize = read; + } + fp += buffersize; + bufferoffset = 0; + + return buffersize; + } + + /** + * Reader interface + */ + public boolean markSupported() { return false; } + + /** + copy what we can from buffer. if it's not enough, fill buffer again and copy again + */ + synchronized public int read(char[] cbuf, int off, int len) throws IOException { + // This could be faster, but probably wont be used + byte[] b = new byte[2]; + int bytesread = 0; + while (len > 0) { + int read = read(b, 0, 2); + bytesread += read; + if (read < 2) { + return bytesread; + } + cbuf[off] = (char)((b[0] << 8) | (b[1] & 0xff)); + off += read; + len -= read; + } + + return bytesread; + } + + synchronized public int read(byte[] buf, int off, int len) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("read(buf, off=" + off + ", len=" + len); + } + + int read = 0; + while (len > 0) { + if (buffersize == 0) { + fill(); + if (buffersize == 0) { + break; + } + } + + int tocopy = Math.min(len, buffersize); + if (LOG.isTraceEnabled()) { + LOG.trace("tocopy=" + tocopy); + } + + System.arraycopy(buffer, bufferoffset, buf, off, tocopy); + buffersize -= tocopy; + bufferoffset += tocopy; + + len -= tocopy; + read += tocopy; + off += tocopy; + } + if (LOG.isTraceEnabled()) { + LOG.trace("read=" + read); + } + + return read; + } + + public void close() throws IOException { + file.close(); + } + + /** + * Seek interface + */ + public long getPosition() { + return bufferoffset + fileoffset; + } + + synchronized public void seek(long pos) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("seek(" + pos + ")"); + } + file.seek(pos); + fp = pos; + buffersize = 0; // force a buffer fill on next read + } + + /** + works like the usual readLine but disregards \r to make things easier + */ + synchronized public String readLine() throws IOException { + StringBuffer s = null; + + // go through buffer until i find a \n, if i reach end of buffer first, put whats in buffer into string buffer, + // repeat + buffering: + for (;;) { + if (buffersize == 0) { + fill(); + if (buffersize == 0) { + break; + } + } + + for (int i = 0; i < buffersize; i++) { + if (buffer[bufferoffset + i] == '\n') { + if (i > 0) { // if \n is first char in buffer, leave the string buffer empty + if (s == null) { s = new StringBuffer(); } + s.append(new String(buffer, bufferoffset, i, "UTF-8")); + } + bufferoffset += i+1; + buffersize -= i+1; + break buffering; + } + } + + // We didn't find \n, read the whole buffer into string buffer + if (s == null) { s = new StringBuffer(); } + s.append(new String(buffer, bufferoffset, buffersize, "UTF-8")); + buffersize = 0; + } + + if (s == null) { + return null; + } else { + return s.toString(); + } + } + + /** + DataInput interface + */ + public void readFully(byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + public void readFully(byte[] b, int off, int len) throws IOException + { + while (len > 0) { + int read = read(b, off, len); + len -= read; + off += read; + + if (read == 0) { + throw new EOFException("End of file reached"); + } + } + } + + public int skipBytes(int n) throws IOException { + seek(getPosition() + n); + return n; + } + + public boolean readBoolean() throws IOException { + return (readByte() != 0); + } + + public byte readByte() throws IOException { + byte[] b = new byte[1]; + readFully(b, 0, 1); + return b[0]; + } + + public int readUnsignedByte() throws IOException { + return (int)readByte(); + } + + public short readShort() throws IOException { + byte[] b = new byte[2]; + readFully(b, 0, 2); + return (short)((b[0] << 8) | (b[1] & 0xff)); + } + + public int readUnsignedShort() throws IOException { + byte[] b = new byte[2]; + readFully(b, 0, 2); + return (((b[0] & 0xff) << 8) | (b[1] & 0xff)); + } + + public char readChar() throws IOException { + return (char)readShort(); + } + + public int readInt() throws IOException { + byte[] b = new byte[4]; + readFully(b, 0, 4); + return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16) | ((b[2] & 0xff) << 8) | (b[3] & 0xff)); + } + + public long readLong() throws IOException { + byte[] b = new byte[8]; + readFully(b, 0, 8); + + return (((long)(b[0] & 0xff) << 56) | ((long)(b[1] & 0xff) << 48) | + ((long)(b[2] & 0xff) << 40) | ((long)(b[3] & 0xff) << 32) | + ((long)(b[4] & 0xff) << 24) | ((long)(b[5] & 0xff) << 16) | + ((long)(b[6] & 0xff) << 8) | ((long)(b[7] & 0xff))); + } + + public float readFloat() throws IOException { + return Float.intBitsToFloat(readInt()); + } + + public double readDouble() throws IOException { + return Double.longBitsToDouble(readLong()); + } + + public String readUTF() throws IOException { + int len = readUnsignedShort(); + byte[] bytes = new byte[len+2]; + bytes[0] = (byte)((len >> 8) & 0xFF); + bytes[1] = (byte)(len & 0xFF); + readFully(bytes, 2, len); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes)); + return dis.readUTF(); + } + + public static void main(String[] args) throws IOException { + RandomAccessFileReader f = new RandomAccessFileReader(new File(args[0])); + + long pos0 = f.getPosition(); + for (int i = 0; i < 5; i++) { + System.out.println(f.readLine()); + } + System.out.println("============="); + long pos1 = f.getPosition(); + System.out.println("pos: " + pos1); + for (int i = 0; i < 5; i++) { + System.out.println(f.readLine()); + } + System.out.println("============="); + f.seek(pos1); + for (int i = 0; i < 5; i++) { + System.out.println(f.readLine()); + } + System.out.println("============="); + f.seek(pos0); + for (int i = 0; i < 5; i++) { + System.out.println(f.readLine()); + } + long pos2 = f.getPosition(); + System.out.println("============="); + System.out.println(f.readLine()); + f.seek(pos2); + System.out.println(f.readLine()); + f.close(); + } +}; http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/TransactionEntry.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/TransactionEntry.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/TransactionEntry.java new file mode 100644 index 0000000..33c7189 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/TransactionEntry.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +public class TransactionEntry extends LogEntry { + public TransactionEntry(long timestamp, long clientId, long Cxid, long Zxid, String op) { + this(timestamp, clientId, Cxid, Zxid, op, ""); + } + + public TransactionEntry(long timestamp, long clientId, long Cxid, long Zxid, String op, String extra) { + super(timestamp); + setAttribute("client-id", new Long(clientId)); + setAttribute("cxid", new Long(Cxid)); + setAttribute("zxid", new Long(Zxid)); + setAttribute("operation", op); + setAttribute("extra", extra); + } + + public long getClientId() { + return (Long)getAttribute("client-id"); + } + + public long getCxid() { + return (Long)getAttribute("cxid"); + } + + public long getZxid() { + return (Long)getAttribute("zxid"); + } + + public String getOp() { + return (String)getAttribute("operation"); + } + + public String getExtra() { + return (String)getAttribute("extra"); + } + + public String toString() { + return getTimestamp() + ":::session(0x" + Long.toHexString(getClientId()) + ") cxid(0x" + Long.toHexString(getCxid()) + ") zxid(0x" + Long.toHexString(getZxid()) + ") op(" + getOp() + ") extra(" + getExtra() +")"; + } + + public Type getType() { return LogEntry.Type.TXN; } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/TxnLogSource.java ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/TxnLogSource.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/TxnLogSource.java new file mode 100644 index 0000000..809c455 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/TxnLogSource.java @@ -0,0 +1,376 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.graph; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.FileInputStream; +import java.io.IOException; +import java.text.DateFormat; +import java.util.Date; +import java.util.zip.Adler32; +import java.util.zip.Checksum; +import java.util.HashMap; + +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.InputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.server.TraceFormatter; +import org.apache.zookeeper.server.persistence.FileHeader; +import org.apache.zookeeper.server.persistence.FileTxnLog; +import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.txn.TxnHeader; + +import org.apache.zookeeper.ZooDefs.OpCode; + +import org.apache.zookeeper.txn.CreateSessionTxn; +import org.apache.zookeeper.txn.CreateTxn; +import org.apache.zookeeper.txn.DeleteTxn; +import org.apache.zookeeper.txn.ErrorTxn; +import org.apache.zookeeper.txn.SetACLTxn; +import org.apache.zookeeper.txn.SetDataTxn; +import org.apache.zookeeper.txn.TxnHeader; + +import java.io.File; +import java.io.Closeable; +import java.io.FileNotFoundException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TxnLogSource implements LogSource { + private static final Logger LOG = LoggerFactory.getLogger(TxnLogSource.class); + + private LogSkipList skiplist = null; + private static final int skipN = 10000; + + private String file = null; + private long starttime = 0; + private long endtime = 0; + private long size = 0; + + public boolean overlapsRange(long starttime, long endtime) { + return (starttime <= this.endtime && endtime >= this.starttime); + } + + public long size() { return size; } + public long getStartTime() { return starttime; } + public long getEndTime() { return endtime; } + public LogSkipList getSkipList() { return skiplist; } + + public static boolean isTransactionFile(String file) throws IOException { + RandomAccessFileReader reader = new RandomAccessFileReader(new File(file)); + BinaryInputArchive logStream = new BinaryInputArchive(reader); + FileHeader fhdr = new FileHeader(); + fhdr.deserialize(logStream, "fileheader"); + reader.close(); + + return fhdr.getMagic() == FileTxnLog.TXNLOG_MAGIC; + } + + private class TxnLogSourceIterator implements LogIterator { + private LogEntry next = null; + private long starttime = 0; + private long endtime = 0; + private TxnLogSource src = null; + private RandomAccessFileReader reader = null; + private BinaryInputArchive logStream = null; + private long skippedAtStart = 0; + private FilterOp filter = null; + + public TxnLogSourceIterator(TxnLogSource src, long starttime, long endtime) throws IllegalArgumentException, FilterException { + this(src,starttime,endtime,null); + } + + public TxnLogSourceIterator(TxnLogSource src, long starttime, long endtime, FilterOp filter) throws IllegalArgumentException, FilterException { + try { + this.src = src; + this.starttime = starttime; + this.endtime = endtime; + reader = new RandomAccessFileReader(new File(src.file)); + logStream = new BinaryInputArchive(reader); + FileHeader fhdr = new FileHeader(); + fhdr.deserialize(logStream, "fileheader"); + } catch (Exception e) { + throw new IllegalArgumentException("Cannot open transaction log ("+src.file+") :" + e); + } + + LogSkipList.Mark start = src.getSkipList().findMarkBefore(starttime); + try { + reader.seek(start.getBytes()); + skippedAtStart = start.getEntriesSkipped(); + } catch (IOException ioe) { + // if we can't skip, we should just read from the start + } + + this.filter = filter; + + LogEntry e; + while ((e = readNextEntry()) != null && e.getTimestamp() < endtime) { + if (e.getTimestamp() >= starttime && (filter == null || filter.matches(e)) ) { + next = e; + return; + } + skippedAtStart++; + } + + + } + + public long size() throws IOException { + if (this.endtime >= src.getEndTime()) { + return src.size() - skippedAtStart; + } + + long pos = reader.getPosition(); + LogEntry e; + + LogSkipList.Mark lastseg = src.getSkipList().findMarkBefore(this.endtime); + reader.seek(lastseg.getBytes()); + // number of entries skipped to get to the end of the iterator, less the number skipped to get to the start + long count = lastseg.getEntriesSkipped() - skippedAtStart; + + while ((e = readNextEntry()) != null) { + if (e.getTimestamp() > this.endtime) { + break; + } + count++; + } + reader.seek(pos);; + + return count; + } + + private LogEntry readNextEntry() { + LogEntry e = null; + try { + long crcValue; + byte[] bytes; + try { + crcValue = logStream.readLong("crcvalue"); + + bytes = logStream.readBuffer("txnEntry"); + } catch (EOFException ex) { + return null; + } + + if (bytes.length == 0) { + return null; + } + Checksum crc = new Adler32(); + crc.update(bytes, 0, bytes.length); + if (crcValue != crc.getValue()) { + throw new IOException("CRC doesn't match " + crcValue + + " vs " + crc.getValue()); + } + TxnHeader hdr = new TxnHeader(); + Record r = SerializeUtils.deserializeTxn(bytes, hdr); + + switch (hdr.getType()) { + case OpCode.createSession: { + e = new TransactionEntry(hdr.getTime(), hdr.getClientId(), hdr.getCxid(), hdr.getZxid(), "createSession"); + } + break; + case OpCode.closeSession: { + e = new TransactionEntry(hdr.getTime(), hdr.getClientId(), hdr.getCxid(), hdr.getZxid(), "closeSession"); + } + break; + case OpCode.create: + if (r != null) { + CreateTxn create = (CreateTxn)r; + String path = create.getPath(); + e = new TransactionEntry(hdr.getTime(), hdr.getClientId(), hdr.getCxid(), hdr.getZxid(), "create", path); + } + break; + case OpCode.setData: + if (r != null) { + SetDataTxn set = (SetDataTxn)r; + String path = set.getPath(); + e = new TransactionEntry(hdr.getTime(), hdr.getClientId(), hdr.getCxid(), hdr.getZxid(), "setData", path); + } + break; + case OpCode.setACL: + if (r != null) { + SetACLTxn setacl = (SetACLTxn)r; + String path = setacl.getPath(); + e = new TransactionEntry(hdr.getTime(), hdr.getClientId(), hdr.getCxid(), hdr.getZxid(), "setACL", path); + } + break; + case OpCode.error: + if (r != null) { + ErrorTxn error = (ErrorTxn)r; + + e = new TransactionEntry(hdr.getTime(), hdr.getClientId(), hdr.getCxid(), hdr.getZxid(), "error", "Error: " + error.getErr()); + } + break; + default: + LOG.info("Unknown op: " + hdr.getType()); + break; + } + + if (logStream.readByte("EOR") != 'B') { + throw new EOFException("Last transaction was partial."); + } + } catch (Exception ex) { + LOG.error("Error reading transaction from (" + src.file + ") :" + e); + return null; + } + return e; + } + + public boolean hasNext() { + return next != null; + } + + public LogEntry next() throws NoSuchElementException { + LogEntry ret = next; + LogEntry e = readNextEntry(); + + if (filter != null) { + try { + while (e != null && !filter.matches(e)) { + e = readNextEntry(); + } + } catch (FilterException fe) { + throw new NoSuchElementException(fe.toString()); + } + } + if (e != null && e.getTimestamp() < endtime) { + next = e; + } else { + next = null; + } + return ret; + } + + public void remove() throws UnsupportedOperationException { + throw new UnsupportedOperationException("remove not supported for Txn logs"); + } + + public void close() throws IOException { + reader.close(); + } + } + + public LogIterator iterator(long starttime, long endtime) throws IllegalArgumentException { + try { + return iterator(starttime, endtime, null); + } catch (FilterException fe) { + assert(false); // should never ever happen + return null; + } + } + + public LogIterator iterator(long starttime, long endtime, FilterOp filter) throws IllegalArgumentException, FilterException { + // sanitise start and end times + if (endtime < starttime) { + throw new IllegalArgumentException("End time (" + endtime + ") must be greater or equal to starttime (" + starttime + ")"); + } + + return new TxnLogSourceIterator(this, starttime, endtime, filter); + } + + public LogIterator iterator() throws IllegalArgumentException { + return iterator(starttime, endtime+1); + } + + public TxnLogSource(String file) throws IOException { + this.file = file; + + skiplist = new LogSkipList(); + + RandomAccessFileReader reader = new RandomAccessFileReader(new File(file)); + try { + BinaryInputArchive logStream = new BinaryInputArchive(reader); + FileHeader fhdr = new FileHeader(); + fhdr.deserialize(logStream, "fileheader"); + + byte[] bytes = null; + while (true) { + long lastFp = reader.getPosition(); + + long crcValue; + + try { + crcValue = logStream.readLong("crcvalue"); + bytes = logStream.readBuffer("txnEntry"); + } catch (EOFException e) { + break; + } + + if (bytes.length == 0) { + break; + } + Checksum crc = new Adler32(); + crc.update(bytes, 0, bytes.length); + if (crcValue != crc.getValue()) { + throw new IOException("CRC doesn't match " + crcValue + + " vs " + crc.getValue()); + } + if (logStream.readByte("EOR") != 'B') { + throw new EOFException("Last transaction was partial."); + } + TxnHeader hdr = new TxnHeader(); + Record r = SerializeUtils.deserializeTxn(bytes, hdr); + + if (starttime == 0) { + starttime = hdr.getTime(); + } + endtime = hdr.getTime(); + + if (size % skipN == 0) { + skiplist.addMark(hdr.getTime(), lastFp, size); + } + size++; + } + if (bytes == null) { + throw new IOException("Nothing read from ("+file+")"); + } + } finally { + reader.close(); + } + } + + public String toString() { + return "TxnLogSource(file=" + file + ", size=" + size + ", start=" + starttime + ", end=" + endtime +")"; + } + + public static void main(String[] args) throws IOException, FilterException { + TxnLogSource s = new TxnLogSource(args[0]); + System.out.println(s); + + LogIterator iter; + + if (args.length == 3) { + long starttime = Long.valueOf(args[1]); + long endtime = Long.valueOf(args[2]); + FilterOp fo = new FilterParser("(or (and (> zxid 0x2f0bd6f5e0) (< zxid 0x2f0bd6f5e9)) (= operation \"error\"))").parse(); + System.out.println("fo: " + fo); + iter = s.iterator(starttime, endtime, fo); + } else { + iter = s.iterator(); + } + System.out.println(iter); + while (iter.hasNext()) { + System.out.println(iter.next()); + } + iter.close(); + } +}
