Repository: accumulo Updated Branches: refs/heads/master 34ca056b3 -> 6b5f5ef47
http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SpanReceiver.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SpanReceiver.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SpanReceiver.java deleted file mode 100644 index b44e51e..0000000 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SpanReceiver.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.trace.instrument.receivers; - -import java.util.Map; - -/** - * The collector within a process that is the destination of Spans when a trace is running. - */ -public interface SpanReceiver { - void span(long traceId, long spanId, long parentId, long start, long stop, String description, Map<String,String> data); - - void flush(); -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java deleted file mode 100644 index 84e3204..0000000 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.trace.instrument.receivers; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Random; - -import org.apache.log4j.Logger; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooKeeper.States; - -/** - * Find a Span collector via zookeeper and push spans there via Thrift RPC - * - */ -public class ZooSpanClient extends SendSpansViaThrift { - - private static final Logger log = Logger.getLogger(ZooSpanClient.class); - private static final int TOTAL_TIME_WAIT_CONNECT_MS = 10 * 1000; - private static final int TIME_WAIT_CONNECT_CHECK_MS = 100; - - ZooKeeper zoo = null; - final String path; - final Random random = new Random(); - final List<String> hosts = new ArrayList<String>(); - - public ZooSpanClient(String keepers, final String path, String host, String service, long millis) throws IOException, KeeperException, InterruptedException { - super(host, service, millis); - this.path = path; - zoo = new ZooKeeper(keepers, 30 * 1000, new Watcher() { - @Override - public void process(WatchedEvent event) { - try { - if (zoo != null) { - updateHosts(path, zoo.getChildren(path, null)); - } - } catch (Exception ex) { - log.error("unable to get destination hosts in zookeeper", ex); - } - } - }); - for (int i = 0; i < TOTAL_TIME_WAIT_CONNECT_MS; i += TIME_WAIT_CONNECT_CHECK_MS) { - if (zoo.getState().equals(States.CONNECTED)) - break; - try { - Thread.sleep(TIME_WAIT_CONNECT_CHECK_MS); - } catch (InterruptedException ex) { - break; - } - } - zoo.getChildren(path, true); - } - - @Override - public void flush() { - if (!hosts.isEmpty()) - super.flush(); - } - - @Override - void sendSpans() { - if (hosts.isEmpty()) { - if (!sendQueue.isEmpty()) { - log.error("No hosts to send data to, dropping queued spans"); - synchronized (sendQueue) { - sendQueue.clear(); - sendQueue.notifyAll(); - } - } - } else { - super.sendSpans(); - } - } - - synchronized private void updateHosts(String path, List<String> children) { - log.debug("Scanning trace hosts in zookeeper: " + path); - try { - List<String> hosts = new ArrayList<String>(); - for (String child : children) { - byte[] data = zoo.getData(path + "/" + child, null, null); - hosts.add(new String(data, UTF_8)); - } - this.hosts.clear(); - this.hosts.addAll(hosts); - log.debug("Trace hosts: " + this.hosts); - } catch (Exception ex) { - log.error("unable to get destination hosts in zookeeper", ex); - } - } - - @Override - synchronized protected String getSpanKey(Map<String,String> data) { - if (hosts.size() > 0) { - String host = hosts.get(random.nextInt(hosts.size())); - log.debug("sending data to " + host); - return host; - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/thrift/Annotation.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/thrift/Annotation.java b/trace/src/main/java/org/apache/accumulo/trace/thrift/Annotation.java new file mode 100644 index 0000000..c00744e --- /dev/null +++ b/trace/src/main/java/org/apache/accumulo/trace/thrift/Annotation.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Autogenerated by Thrift Compiler (0.9.1) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.accumulo.trace.thrift; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("all") public class Annotation implements org.apache.thrift.TBase<Annotation, Annotation._Fields>, java.io.Serializable, Cloneable, Comparable<Annotation> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Annotation"); + + private static final org.apache.thrift.protocol.TField TIME_FIELD_DESC = new org.apache.thrift.protocol.TField("time", org.apache.thrift.protocol.TType.I64, (short)1); + private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRING, (short)2); + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new AnnotationStandardSchemeFactory()); + schemes.put(TupleScheme.class, new AnnotationTupleSchemeFactory()); + } + + public long time; // required + public String msg; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { + TIME((short)1, "time"), + MSG((short)2, "msg"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // TIME + return TIME; + case 2: // MSG + return MSG; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __TIME_ISSET_ID = 0; + private byte __isset_bitfield = 0; + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.TIME, new org.apache.thrift.meta_data.FieldMetaData("time", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Annotation.class, metaDataMap); + } + + public Annotation() { + } + + public Annotation( + long time, + String msg) + { + this(); + this.time = time; + setTimeIsSet(true); + this.msg = msg; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public Annotation(Annotation other) { + __isset_bitfield = other.__isset_bitfield; + this.time = other.time; + if (other.isSetMsg()) { + this.msg = other.msg; + } + } + + public Annotation deepCopy() { + return new Annotation(this); + } + + @Override + public void clear() { + setTimeIsSet(false); + this.time = 0; + this.msg = null; + } + + public long getTime() { + return this.time; + } + + public Annotation setTime(long time) { + this.time = time; + setTimeIsSet(true); + return this; + } + + public void unsetTime() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TIME_ISSET_ID); + } + + /** Returns true if field time is set (has been assigned a value) and false otherwise */ + public boolean isSetTime() { + return EncodingUtils.testBit(__isset_bitfield, __TIME_ISSET_ID); + } + + public void setTimeIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TIME_ISSET_ID, value); + } + + public String getMsg() { + return this.msg; + } + + public Annotation setMsg(String msg) { + this.msg = msg; + return this; + } + + public void unsetMsg() { + this.msg = null; + } + + /** Returns true if field msg is set (has been assigned a value) and false otherwise */ + public boolean isSetMsg() { + return this.msg != null; + } + + public void setMsgIsSet(boolean value) { + if (!value) { + this.msg = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case TIME: + if (value == null) { + unsetTime(); + } else { + setTime((Long)value); + } + break; + + case MSG: + if (value == null) { + unsetMsg(); + } else { + setMsg((String)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case TIME: + return Long.valueOf(getTime()); + + case MSG: + return getMsg(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case TIME: + return isSetTime(); + case MSG: + return isSetMsg(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof Annotation) + return this.equals((Annotation)that); + return false; + } + + public boolean equals(Annotation that) { + if (that == null) + return false; + + boolean this_present_time = true; + boolean that_present_time = true; + if (this_present_time || that_present_time) { + if (!(this_present_time && that_present_time)) + return false; + if (this.time != that.time) + return false; + } + + boolean this_present_msg = true && this.isSetMsg(); + boolean that_present_msg = true && that.isSetMsg(); + if (this_present_msg || that_present_msg) { + if (!(this_present_msg && that_present_msg)) + return false; + if (!this.msg.equals(that.msg)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public int compareTo(Annotation other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetTime()).compareTo(other.isSetTime()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTime()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.time, other.time); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetMsg()).compareTo(other.isSetMsg()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetMsg()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.msg, other.msg); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("Annotation("); + boolean first = true; + + sb.append("time:"); + sb.append(this.time); + first = false; + if (!first) sb.append(", "); + sb.append("msg:"); + if (this.msg == null) { + sb.append("null"); + } else { + sb.append(this.msg); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class AnnotationStandardSchemeFactory implements SchemeFactory { + public AnnotationStandardScheme getScheme() { + return new AnnotationStandardScheme(); + } + } + + private static class AnnotationStandardScheme extends StandardScheme<Annotation> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, Annotation struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // TIME + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.time = iprot.readI64(); + struct.setTimeIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // MSG + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.msg = iprot.readString(); + struct.setMsgIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, Annotation struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldBegin(TIME_FIELD_DESC); + oprot.writeI64(struct.time); + oprot.writeFieldEnd(); + if (struct.msg != null) { + oprot.writeFieldBegin(MSG_FIELD_DESC); + oprot.writeString(struct.msg); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class AnnotationTupleSchemeFactory implements SchemeFactory { + public AnnotationTupleScheme getScheme() { + return new AnnotationTupleScheme(); + } + } + + private static class AnnotationTupleScheme extends TupleScheme<Annotation> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, Annotation struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetTime()) { + optionals.set(0); + } + if (struct.isSetMsg()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetTime()) { + oprot.writeI64(struct.time); + } + if (struct.isSetMsg()) { + oprot.writeString(struct.msg); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, Annotation struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.time = iprot.readI64(); + struct.setTimeIsSet(true); + } + if (incoming.get(1)) { + struct.msg = iprot.readString(); + struct.setMsgIsSet(true); + } + } + } + +} + http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/thrift/RemoteSpan.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/thrift/RemoteSpan.java b/trace/src/main/java/org/apache/accumulo/trace/thrift/RemoteSpan.java index 416ae17..bfe183d 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/thrift/RemoteSpan.java +++ b/trace/src/main/java/org/apache/accumulo/trace/thrift/RemoteSpan.java @@ -59,7 +59,8 @@ import org.slf4j.LoggerFactory; private static final org.apache.thrift.protocol.TField START_FIELD_DESC = new org.apache.thrift.protocol.TField("start", org.apache.thrift.protocol.TType.I64, (short)6); private static final org.apache.thrift.protocol.TField STOP_FIELD_DESC = new org.apache.thrift.protocol.TField("stop", org.apache.thrift.protocol.TType.I64, (short)7); private static final org.apache.thrift.protocol.TField DESCRIPTION_FIELD_DESC = new org.apache.thrift.protocol.TField("description", org.apache.thrift.protocol.TType.STRING, (short)8); - private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.MAP, (short)9); + private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.MAP, (short)10); + private static final org.apache.thrift.protocol.TField ANNOTATIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("annotations", org.apache.thrift.protocol.TType.LIST, (short)11); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -75,7 +76,8 @@ import org.slf4j.LoggerFactory; public long start; // required public long stop; // required public String description; // required - public Map<String,String> data; // required + public Map<ByteBuffer,ByteBuffer> data; // required + public List<Annotation> annotations; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -87,7 +89,8 @@ import org.slf4j.LoggerFactory; START((short)6, "start"), STOP((short)7, "stop"), DESCRIPTION((short)8, "description"), - DATA((short)9, "data"); + DATA((short)10, "data"), + ANNOTATIONS((short)11, "annotations"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -118,8 +121,10 @@ import org.slf4j.LoggerFactory; return STOP; case 8: // DESCRIPTION return DESCRIPTION; - case 9: // DATA + case 10: // DATA return DATA; + case 11: // ANNOTATIONS + return ANNOTATIONS; default: return null; } @@ -187,8 +192,11 @@ import org.slf4j.LoggerFactory; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true), + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true)))); + tmpMap.put(_Fields.ANNOTATIONS, new org.apache.thrift.meta_data.FieldMetaData("annotations", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, Annotation.class)))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RemoteSpan.class, metaDataMap); } @@ -205,7 +213,8 @@ import org.slf4j.LoggerFactory; long start, long stop, String description, - Map<String,String> data) + Map<ByteBuffer,ByteBuffer> data, + List<Annotation> annotations) { this(); this.sender = sender; @@ -222,6 +231,7 @@ import org.slf4j.LoggerFactory; setStopIsSet(true); this.description = description; this.data = data; + this.annotations = annotations; } /** @@ -244,9 +254,16 @@ import org.slf4j.LoggerFactory; this.description = other.description; } if (other.isSetData()) { - Map<String,String> __this__data = new HashMap<String,String>(other.data); + Map<ByteBuffer,ByteBuffer> __this__data = new HashMap<ByteBuffer,ByteBuffer>(other.data); this.data = __this__data; } + if (other.isSetAnnotations()) { + List<Annotation> __this__annotations = new ArrayList<Annotation>(other.annotations.size()); + for (Annotation other_element : other.annotations) { + __this__annotations.add(new Annotation(other_element)); + } + this.annotations = __this__annotations; + } } public RemoteSpan deepCopy() { @@ -269,6 +286,7 @@ import org.slf4j.LoggerFactory; this.stop = 0; this.description = null; this.data = null; + this.annotations = null; } public String getSender() { @@ -462,18 +480,18 @@ import org.slf4j.LoggerFactory; return (this.data == null) ? 0 : this.data.size(); } - public void putToData(String key, String val) { + public void putToData(ByteBuffer key, ByteBuffer val) { if (this.data == null) { - this.data = new HashMap<String,String>(); + this.data = new HashMap<ByteBuffer,ByteBuffer>(); } this.data.put(key, val); } - public Map<String,String> getData() { + public Map<ByteBuffer,ByteBuffer> getData() { return this.data; } - public RemoteSpan setData(Map<String,String> data) { + public RemoteSpan setData(Map<ByteBuffer,ByteBuffer> data) { this.data = data; return this; } @@ -493,6 +511,45 @@ import org.slf4j.LoggerFactory; } } + public int getAnnotationsSize() { + return (this.annotations == null) ? 0 : this.annotations.size(); + } + + public java.util.Iterator<Annotation> getAnnotationsIterator() { + return (this.annotations == null) ? null : this.annotations.iterator(); + } + + public void addToAnnotations(Annotation elem) { + if (this.annotations == null) { + this.annotations = new ArrayList<Annotation>(); + } + this.annotations.add(elem); + } + + public List<Annotation> getAnnotations() { + return this.annotations; + } + + public RemoteSpan setAnnotations(List<Annotation> annotations) { + this.annotations = annotations; + return this; + } + + public void unsetAnnotations() { + this.annotations = null; + } + + /** Returns true if field annotations is set (has been assigned a value) and false otherwise */ + public boolean isSetAnnotations() { + return this.annotations != null; + } + + public void setAnnotationsIsSet(boolean value) { + if (!value) { + this.annotations = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case SENDER: @@ -563,7 +620,15 @@ import org.slf4j.LoggerFactory; if (value == null) { unsetData(); } else { - setData((Map<String,String>)value); + setData((Map<ByteBuffer,ByteBuffer>)value); + } + break; + + case ANNOTATIONS: + if (value == null) { + unsetAnnotations(); + } else { + setAnnotations((List<Annotation>)value); } break; @@ -599,6 +664,9 @@ import org.slf4j.LoggerFactory; case DATA: return getData(); + case ANNOTATIONS: + return getAnnotations(); + } throw new IllegalStateException(); } @@ -628,6 +696,8 @@ import org.slf4j.LoggerFactory; return isSetDescription(); case DATA: return isSetData(); + case ANNOTATIONS: + return isSetAnnotations(); } throw new IllegalStateException(); } @@ -726,6 +796,15 @@ import org.slf4j.LoggerFactory; return false; } + boolean this_present_annotations = true && this.isSetAnnotations(); + boolean that_present_annotations = true && that.isSetAnnotations(); + if (this_present_annotations || that_present_annotations) { + if (!(this_present_annotations && that_present_annotations)) + return false; + if (!this.annotations.equals(that.annotations)) + return false; + } + return true; } @@ -832,6 +911,16 @@ import org.slf4j.LoggerFactory; return lastComparison; } } + lastComparison = Boolean.valueOf(isSetAnnotations()).compareTo(other.isSetAnnotations()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetAnnotations()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.annotations, other.annotations); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -903,6 +992,14 @@ import org.slf4j.LoggerFactory; sb.append(this.data); } first = false; + if (!first) sb.append(", "); + sb.append("annotations:"); + if (this.annotations == null) { + sb.append("null"); + } else { + sb.append(this.annotations); + } + first = false; sb.append(")"); return sb.toString(); } @@ -1012,17 +1109,17 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 9: // DATA + case 10: // DATA if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { org.apache.thrift.protocol.TMap _map0 = iprot.readMapBegin(); - struct.data = new HashMap<String,String>(2*_map0.size); + struct.data = new HashMap<ByteBuffer,ByteBuffer>(2*_map0.size); for (int _i1 = 0; _i1 < _map0.size; ++_i1) { - String _key2; - String _val3; - _key2 = iprot.readString(); - _val3 = iprot.readString(); + ByteBuffer _key2; + ByteBuffer _val3; + _key2 = iprot.readBinary(); + _val3 = iprot.readBinary(); struct.data.put(_key2, _val3); } iprot.readMapEnd(); @@ -1032,6 +1129,25 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 11: // ANNOTATIONS + if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { + { + org.apache.thrift.protocol.TList _list4 = iprot.readListBegin(); + struct.annotations = new ArrayList<Annotation>(_list4.size); + for (int _i5 = 0; _i5 < _list4.size; ++_i5) + { + Annotation _elem6; + _elem6 = new Annotation(); + _elem6.read(iprot); + struct.annotations.add(_elem6); + } + iprot.readListEnd(); + } + struct.setAnnotationsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1081,15 +1197,27 @@ import org.slf4j.LoggerFactory; oprot.writeFieldBegin(DATA_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.data.size())); - for (Map.Entry<String, String> _iter4 : struct.data.entrySet()) + for (Map.Entry<ByteBuffer, ByteBuffer> _iter7 : struct.data.entrySet()) { - oprot.writeString(_iter4.getKey()); - oprot.writeString(_iter4.getValue()); + oprot.writeBinary(_iter7.getKey()); + oprot.writeBinary(_iter7.getValue()); } oprot.writeMapEnd(); } oprot.writeFieldEnd(); } + if (struct.annotations != null) { + oprot.writeFieldBegin(ANNOTATIONS_FIELD_DESC); + { + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.annotations.size())); + for (Annotation _iter8 : struct.annotations) + { + _iter8.write(oprot); + } + oprot.writeListEnd(); + } + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1135,7 +1263,10 @@ import org.slf4j.LoggerFactory; if (struct.isSetData()) { optionals.set(8); } - oprot.writeBitSet(optionals, 9); + if (struct.isSetAnnotations()) { + optionals.set(9); + } + oprot.writeBitSet(optionals, 10); if (struct.isSetSender()) { oprot.writeString(struct.sender); } @@ -1163,10 +1294,19 @@ import org.slf4j.LoggerFactory; if (struct.isSetData()) { { oprot.writeI32(struct.data.size()); - for (Map.Entry<String, String> _iter5 : struct.data.entrySet()) + for (Map.Entry<ByteBuffer, ByteBuffer> _iter9 : struct.data.entrySet()) { - oprot.writeString(_iter5.getKey()); - oprot.writeString(_iter5.getValue()); + oprot.writeBinary(_iter9.getKey()); + oprot.writeBinary(_iter9.getValue()); + } + } + } + if (struct.isSetAnnotations()) { + { + oprot.writeI32(struct.annotations.size()); + for (Annotation _iter10 : struct.annotations) + { + _iter10.write(oprot); } } } @@ -1175,7 +1315,7 @@ import org.slf4j.LoggerFactory; @Override public void read(org.apache.thrift.protocol.TProtocol prot, RemoteSpan struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(9); + BitSet incoming = iprot.readBitSet(10); if (incoming.get(0)) { struct.sender = iprot.readString(); struct.setSenderIsSet(true); @@ -1210,19 +1350,33 @@ import org.slf4j.LoggerFactory; } if (incoming.get(8)) { { - org.apache.thrift.protocol.TMap _map6 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); - struct.data = new HashMap<String,String>(2*_map6.size); - for (int _i7 = 0; _i7 < _map6.size; ++_i7) + org.apache.thrift.protocol.TMap _map11 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); + struct.data = new HashMap<ByteBuffer,ByteBuffer>(2*_map11.size); + for (int _i12 = 0; _i12 < _map11.size; ++_i12) { - String _key8; - String _val9; - _key8 = iprot.readString(); - _val9 = iprot.readString(); - struct.data.put(_key8, _val9); + ByteBuffer _key13; + ByteBuffer _val14; + _key13 = iprot.readBinary(); + _val14 = iprot.readBinary(); + struct.data.put(_key13, _val14); } } struct.setDataIsSet(true); } + if (incoming.get(9)) { + { + org.apache.thrift.protocol.TList _list15 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.annotations = new ArrayList<Annotation>(_list15.size); + for (int _i16 = 0; _i16 < _list15.size; ++_i16) + { + Annotation _elem17; + _elem17 = new Annotation(); + _elem17.read(iprot); + struct.annotations.add(_elem17); + } + } + struct.setAnnotationsIsSet(true); + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/thrift/trace.thrift ---------------------------------------------------------------------- diff --git a/trace/src/main/thrift/trace.thrift b/trace/src/main/thrift/trace.thrift index 76bcafe..b7e0abf 100644 --- a/trace/src/main/thrift/trace.thrift +++ b/trace/src/main/thrift/trace.thrift @@ -17,6 +17,11 @@ namespace java org.apache.accumulo.trace.thrift namespace cpp org.apache.accumulo.trace.thrift +struct Annotation { + 1:i64 time, + 2:string msg +} + struct RemoteSpan { 1:string sender, 2:string svc, @@ -26,7 +31,8 @@ struct RemoteSpan { 6:i64 start, 7:i64 stop, 8:string description, - 9:map<string, string> data + 10:map<binary, binary> data, + 11:list<Annotation> annotations } struct TInfo { http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/test/java/org/apache/accumulo/trace/instrument/TracerTest.java ---------------------------------------------------------------------- diff --git a/trace/src/test/java/org/apache/accumulo/trace/instrument/TracerTest.java b/trace/src/test/java/org/apache/accumulo/trace/instrument/TracerTest.java index f338bd8..4afdebe 100644 --- a/trace/src/test/java/org/apache/accumulo/trace/instrument/TracerTest.java +++ b/trace/src/test/java/org/apache/accumulo/trace/instrument/TracerTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; @@ -30,7 +31,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import org.apache.accumulo.trace.instrument.receivers.SpanReceiver; import org.apache.accumulo.trace.instrument.thrift.TraceWrap; import org.apache.accumulo.trace.thrift.TInfo; import org.apache.accumulo.trace.thrift.TestService; @@ -43,13 +43,17 @@ import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; +import org.htrace.HTraceConfiguration; +import org.htrace.Sampler; +import org.htrace.SpanReceiver; +import org.htrace.wrappers.TraceProxy; import org.junit.Before; import org.junit.Test; public class TracerTest { static class SpanStruct { - public SpanStruct(long traceId, long spanId, long parentId, long start, long stop, String description, Map<String,String> data) { + public SpanStruct(long traceId, long spanId, long parentId, long start, long stop, String description, Map<byte[],byte[]> data) { super(); this.traceId = traceId; this.spanId = spanId; @@ -66,7 +70,7 @@ public class TracerTest { public long start; public long stop; public String description; - public Map<String,String> data; + public Map<byte[],byte[]> data; public long millis() { return stop - start; @@ -77,21 +81,29 @@ public class TracerTest { public Map<Long,List<SpanStruct>> traces = new HashMap<Long,List<SpanStruct>>(); @Override - public void span(long traceId, long spanId, long parentId, long start, long stop, String description, Map<String,String> data) { - SpanStruct span = new SpanStruct(traceId, spanId, parentId, start, stop, description, data); + public void receiveSpan(org.htrace.Span s) { + long traceId = s.getTraceId(); + SpanStruct span = new SpanStruct(traceId, s.getSpanId(), s.getParentId(), s.getStartTimeMillis(), s.getStopTimeMillis(), s.getDescription(), + s.getKVAnnotations()); if (!traces.containsKey(traceId)) traces.put(traceId, new ArrayList<SpanStruct>()); traces.get(traceId).add(span); } @Override - public void flush() {} + public void configure(HTraceConfiguration conf) { + } + + @Override + public void close() throws IOException { + } } + @SuppressWarnings("deprecation") @Test public void testTrace() throws Exception { TestReceiver tracer = new TestReceiver(); - Tracer.getInstance().addReceiver(tracer); + org.htrace.Trace.addReceiver(tracer); assertFalse(Trace.isTracing()); Trace.start("nop").stop(); @@ -103,12 +115,12 @@ public class TracerTest { assertFalse(Trace.isTracing()); Span start = Trace.on("testing"); - assertEquals(Trace.currentTrace(), start); + assertEquals(Trace.currentTrace().getSpan(), start.getScope().getSpan()); assertTrue(Trace.isTracing()); - Trace.start("shortest trace ever"); - Trace.currentTrace().stop(); - long traceId = Trace.currentTrace().traceId(); + Span span = Trace.start("shortest trace ever"); + span.stop(); + long traceId = Trace.currentTraceId(); assertNotNull(tracer.traces.get(traceId)); assertTrue(tracer.traces.get(traceId).size() == 1); assertEquals("shortest trace ever", tracer.traces.get(traceId).get(0).description); @@ -149,7 +161,7 @@ public class TracerTest { @Test public void testThrift() throws Exception { TestReceiver tracer = new TestReceiver(); - Tracer.getInstance().addReceiver(tracer); + org.htrace.Trace.addReceiver(tracer); ServerSocket socket = new ServerSocket(0); TServerSocket transport = new TServerSocket(socket); @@ -195,25 +207,26 @@ public class TracerTest { } /** - * Verify that exceptions propagate up through the trace wrapping with sampling enabled, instead of seeing the reflexive exceptions. + * Verify that exceptions propagate up through the trace wrapping with sampling enabled, as the cause of the reflexive exceptions. */ @Test(expected = IOException.class) - public void testTracedException() throws Exception { - TraceProxy.trace(callable).call(); + public void testTracedException() throws Throwable { + try { + TraceProxy.trace(callable).call(); + } catch (InvocationTargetException e) { + throw e.getCause(); + } } /** - * Verify that exceptions propagate up through the trace wrapping with sampling disabled, instead of seeing the reflexive exceptions. + * Verify that exceptions propagate up through the trace wrapping with sampling disabled, as the cause of the reflexive exceptions. */ @Test(expected = IOException.class) - public void testUntracedException() throws Exception { - Sampler never = new Sampler() { - @Override - public boolean next() { - return false; - } - }; - - TraceProxy.trace(callable, never).call(); + public void testUntracedException() throws Throwable { + try { + TraceProxy.trace(callable, Sampler.NEVER).call(); + } catch (InvocationTargetException e) { + throw e.getCause(); + } } }
