Author: davsclaus Date: Sun Jan 11 22:37:54 2009 New Revision: 733633 URL: http://svn.apache.org/viewvc?rev=733633&view=rev Log: CAMEL-1245: Improved tracer with pluggable TraceFormatter. Added TraveEvent and TraceEventMessage for routing TraceEvent to custom endpoints such as a JPA for persistance in DB.
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java (contents, props changed) - copied, changed from r733098, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceFormatter.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceEvent.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceEventMessage.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceFormatter.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java (with props) activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceFormatterTest.java activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorDestinationTest.java (contents, props changed) - copied, changed from r732940, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorTest.java Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCache.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/FromEndpointTest.java activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/language/FileLanguageTest.java activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorWithOutBodyTraceTest.java activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorTest.java Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCache.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCache.java?rev=733633&r1=733632&r2=733633&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCache.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCache.java Sun Jan 11 22:37:54 2009 @@ -16,8 +16,6 @@ */ package org.apache.camel.converter.stream; -import java.io.IOException; - import org.apache.camel.processor.interceptor.StreamCachingInterceptor; /** Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=733633&r1=733632&r2=733633&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java Sun Jan 11 22:37:54 2009 @@ -20,12 +20,12 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangeProperty; import org.apache.camel.Message; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.Endpoint; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.util.UuidGenerator; import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; @@ -58,9 +58,10 @@ this.pattern = pattern; } - public DefaultExchange(DefaultExchange parent) { + public DefaultExchange(Exchange parent) { this(parent.getContext(), parent.getPattern()); this.unitOfWork = parent.getUnitOfWork(); + this.fromEndpoint = parent.getFromEndpoint(); } public DefaultExchange(Endpoint fromEndpoint) { @@ -121,6 +122,7 @@ } private static Message safeCopy(Exchange exchange, Message message) { + // TODO: This method is not used if (message == null) { return null; } Copied: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java (from r733098, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceFormatter.java) URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java?p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceFormatter.java&r1=733098&r2=733633&rev=733633&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceFormatter.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java Sun Jan 11 22:37:54 2009 @@ -18,16 +18,16 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.NoTypeConversionAvailableException; -import org.apache.camel.converter.stream.StreamCache; +import org.apache.camel.model.ProcessorType; import org.apache.camel.spi.UnitOfWork; -import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.MessageHelper; /** * @version $Revision$ */ -public class TraceFormatter { +public class DefaultTraceFormatter implements TraceFormatter { private int breadCrumbLength; + private int nodeLength; private boolean showBreadCrumb = true; private boolean showNode = true; private boolean showExchangeId; @@ -41,41 +41,38 @@ private boolean showOutBodyType; private boolean showException = true; - public Object format(TraceInterceptor interceptor, Exchange exchange) { + public Object format(final TraceInterceptor interceptor, final ProcessorType node, final Exchange exchange) { Message in = exchange.getIn(); - - // false because we don't want to introduce side effects - Message out = exchange.getOut(false); - - Throwable exception = exchange.getException(); + Message out = exchange.getOut(false); + StringBuilder sb = new StringBuilder(); - sb.append(getExchangeAndNode(interceptor, exchange)); + sb.append(extractBreadCrumb(interceptor, node, exchange)); if (showExchangePattern) { - sb.append(", Pattern:").append(exchange.getPattern()).append(" "); + sb.append(", Pattern:").append(exchange.getPattern()); } // only show properties if we have any if (showProperties && !exchange.getProperties().isEmpty()) { - sb.append(", Properties:").append(exchange.getProperties()).append(" "); + sb.append(", Properties:").append(exchange.getProperties()); } // only show headers if we have any if (showHeaders && !in.getHeaders().isEmpty()) { - sb.append(", Headers:").append(in.getHeaders()).append(" "); + sb.append(", Headers:").append(in.getHeaders()); } if (showBodyType) { - sb.append(", BodyType:").append(getBodyTypeAsString(in)).append(" "); + sb.append(", BodyType:").append(MessageHelper.getBodyTypeName(in)); } if (showBody) { - sb.append(", Body:").append(getBodyAsString(in)).append(" "); + sb.append(", Body:").append(MessageHelper.extractBodyAsString(in)); } if (showOutBodyType && out != null) { - sb.append(", OutBodyType:").append(getBodyTypeAsString(out)).append(" "); + sb.append(", OutBodyType:").append(MessageHelper.getBodyTypeName(out)); } if (showOutBody && out != null) { - sb.append(", OutBody:").append(getBodyAsString(out)).append(" "); + sb.append(", OutBody:").append(MessageHelper.extractBodyAsString(out)); } - if (showException && exception != null) { - sb.append(", Exception:").append(exception); + if (showException && exchange.getException() != null) { + sb.append(", Exception:").append(exchange.getException()); } return sb.toString(); @@ -185,6 +182,14 @@ this.showShortExchangeId = showShortExchangeId; } + public int getNodeLength() { + return nodeLength; + } + + public void setNodeLength(int nodeLength) { + this.nodeLength = nodeLength; + } + // Implementation methods //------------------------------------------------------------------------- protected Object getBreadCrumbID(Exchange exchange) { @@ -192,61 +197,25 @@ return unitOfWork.getId(); } - protected Object getBodyAsString(Message in) { - if (in == null) { - return null; - } - - StreamCache newBody = null; - try { - newBody = in.getBody(StreamCache.class); - if (newBody != null) { - in.setBody(newBody); - } - } catch (NoTypeConversionAvailableException ex) { - // ignore, in not of StreamCache type - } - - Object answer = null; - try { - answer = in.getBody(String.class); - } catch (NoTypeConversionAvailableException ex) { - answer = in.getBody(); - } - - if (newBody != null) { - // Reset the InputStreamCache - newBody.reset(); - } - return answer; - } - - protected Object getBodyTypeAsString(Message message) { - if (message == null) { - return null; - } - String answer = ObjectHelper.classCanonicalName(message.getBody()); - if (answer != null && answer.startsWith("java.lang.")) { - return answer.substring(10); + protected String getNodeMessage(ProcessorType node) { + String message = node.getShortName() + "(" + node.getLabel() + ")"; + if (nodeLength > 0) { + return String.format("%1$-" + nodeLength + "." + nodeLength + "s", message); + } else { + return message; } - return answer; - } - - protected String getNodeMessage(TraceInterceptor interceptor) { - String message = interceptor.getNode().getShortName() + "(" + interceptor.getNode().getLabel() + ")"; - return String.format("%1$-25.25s", message); } /** - * Returns the exchange id and node, ordered based on whether this was a trace of + * Creates the breadcrumb based on whether this was a trace of * an exchange coming out of or into a processing step. For example, * <br/><tt>transform(body) -> ID-mojo/39713-1225468755256/2-0</tt> * <br/>or * <br/><tt>ID-mojo/39713-1225468755256/2-0 -> transform(body)</tt> */ - protected String getExchangeAndNode(TraceInterceptor interceptor, Exchange exchange) { + protected String extractBreadCrumb(TraceInterceptor interceptor, ProcessorType node, Exchange exchange) { String id = ""; - String node = ""; + String nodeMsg = ""; String result; if (!showBreadCrumb && !showExchangeId && !showShortExchangeId && !showNode) { @@ -264,13 +233,13 @@ } if (showNode) { - node = getNodeMessage(interceptor); + nodeMsg = getNodeMessage(node); } if (interceptor.shouldTraceOutExchanges() && exchange.getOut(false) != null) { - result = node.trim() + " -> " + id.trim(); + result = nodeMsg.trim() + " -> " + id.trim(); } else { - result = id.trim() + " -> " + node.trim(); + result = id.trim() + " -> " + nodeMsg.trim(); } if (breadCrumbLength > 0) { Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceEvent.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceEvent.java?rev=733633&view=auto ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceEvent.java (added) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceEvent.java Sun Jan 11 22:37:54 2009 @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.interceptor; + +import java.util.Date; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.impl.DefaultExchange; + +/** + * Represents a trace of an {...@link org.apache.camel.Exchange}, intercepted at the given node + * that occured during routing. + * <p/> + * The IN body contains {...@link TraceEventMessage} with trace details of the original IN message. + */ +public class TraceEvent extends DefaultExchange { + private String nodeId; + private String exchangeId; + private Date timestamp; + private Exchange tracedExchange; + + public TraceEvent(Exchange parent) { + super(parent); + } + + @Override + public Exchange newInstance() { + TraceEvent answer = new TraceEvent(this); + answer.setExchangeId(exchangeId); + answer.setNodeId(nodeId); + answer.setTimestamp(timestamp); + answer.setTracedExchange(tracedExchange); + return answer; + } + + /** + * Get the id of the node of the trace interception + */ + public String getNodeId() { + return nodeId; + } + + /** + * Timestamp of the interception + */ + public Date getTimestamp() { + return timestamp; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public void setTimestamp(Date timestamp) { + this.timestamp = timestamp; + } + + public Exchange getTracedExchange() { + return tracedExchange; + } + + public void setTracedExchange(Exchange tracedExchange) { + this.tracedExchange = tracedExchange; + } + + @Override + public ExchangePattern getPattern() { + return ExchangePattern.InOnly; + } + + @Override + public String toString() { + return "TraceEvent[" + tracedExchange.getExchangeId() + "] on node: " + nodeId; + } +} Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceEventMessage.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceEventMessage.java?rev=733633&view=auto ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceEventMessage.java (added) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceEventMessage.java Sun Jan 11 22:37:54 2009 @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.interceptor; + +import java.io.Serializable; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.model.ProcessorType; +import org.apache.camel.util.MessageHelper; + +/** + * A trace event message that contains decomposited information about the traced + * {...@link Exchange} at the point of interception. The information is stored as snapshot copies + * using String types. + */ +public final class TraceEventMessage implements Serializable { + + private String fromEndpointUri; + private String node; + private String exchangeId; + private String shortExchangeId; + private String exchangePattern; + private String properties; + private String headers; + private String body; + private String bodyType; + private String outBody; + private String outBodyType; + private String exception; + + /** + * Creates a {...@link TraceEventMessage} based on the given node it was traced while processing + * the current {...@link Exchange} + * + * @param node the node where this trace is intercepted + * @param exchange the current {...@link Exchange} + */ + public TraceEventMessage(final ProcessorType node, final Exchange exchange) { + Message in = exchange.getIn(); + + // false because we don't want to introduce side effects + Message out = exchange.getOut(false); + + // need to use defensive copies to avoid Exchange altering after the point of interception + this.fromEndpointUri = exchange.getFromEndpoint() != null ? exchange.getFromEndpoint().getEndpointUri() : null; + this.node = extractNode(node); + this.exchangeId = exchange.getExchangeId(); + this.shortExchangeId = extractShortExchangeId(exchange); + this.exchangePattern = exchange.getPattern().toString(); + this.properties = exchange.getProperties().isEmpty() ? null : exchange.getProperties().toString(); + this.headers = in.getHeaders().isEmpty() ? null : in.getHeaders().toString(); + this.body = MessageHelper.extractBodyAsString(in); + this.bodyType = MessageHelper.getBodyTypeName(in); + this.outBody = MessageHelper.extractBodyAsString(out); + this.outBodyType = MessageHelper.getBodyTypeName(out); + this.exception = exchange.getException() != null ? exchange.getException().toString() : null; + } + + // Implementation + //--------------------------------------------------------------- + private String extractNode(ProcessorType node) { + return node.getShortName() + "(" + node.getLabel() + ")"; + } + + private String extractShortExchangeId(Exchange exchange) { + return exchange.getExchangeId().substring(exchange.getExchangeId().indexOf("/") + 1); + } + + // Properties + //--------------------------------------------------------------- + + public String getFromEndpointUri() { + return fromEndpointUri; + } + + public String getNode() { + return node; + } + + public String getExchangeId() { + return exchangeId; + } + + public String getShortExchangeId() { + return shortExchangeId; + } + + public String getExchangePattern() { + return exchangePattern; + } + + public String getProperties() { + return properties; + } + + public String getHeaders() { + return headers; + } + + public String getBody() { + return body; + } + + public String getBodyType() { + return bodyType; + } + + public String getOutBody() { + return outBody; + } + + public String getOutBodyType() { + return outBodyType; + } + + public String getException() { + return exception; + } + + @Override + public String toString() { + return "TraceEventMessage[" + exchangeId + "] for node: " + node; + } +} Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceFormatter.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceFormatter.java?rev=733633&view=auto ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceFormatter.java (added) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceFormatter.java Sun Jan 11 22:37:54 2009 @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.interceptor; + +import org.apache.camel.Exchange; +import org.apache.camel.model.ProcessorType; + +/** + * Formatter to format trace logs when tracing {...@link Exchange} during routing. + */ +public interface TraceFormatter { + + /** + * Formats a log message at given point of interception. + * + * @param interceptor the tracing interceptor + * @param node the node where the interception occured + * @param exchange the current exchange + * @return the log message + */ + Object format(TraceInterceptor interceptor, ProcessorType node, Exchange exchange); +} Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java?rev=733633&r1=733632&r2=733633&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java Sun Jan 11 22:37:54 2009 @@ -16,13 +16,18 @@ */ package org.apache.camel.processor.interceptor; +import java.util.Date; + import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.Producer; import org.apache.camel.model.InterceptorRef; import org.apache.camel.model.ProcessorType; import org.apache.camel.processor.DelegateProcessor; import org.apache.camel.processor.Logger; import org.apache.camel.spi.InterceptStrategy; +import org.apache.camel.util.ServiceHelper; +import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** @@ -31,7 +36,10 @@ * @version $Revision$ */ public class TraceInterceptor extends DelegateProcessor implements ExchangeFormatter { + private static final transient Log LOG = LogFactory.getLog(TraceInterceptor.class); + private static final String TRACE_EVENT = "CamelTraceEvent"; private Logger logger; + private Producer traceEventProducer; private final ProcessorType node; private final Tracer tracer; private TraceFormatter formatter; @@ -70,14 +78,30 @@ } public void process(final Exchange exchange) throws Exception { + // interceptor will also trace routes supposed only for TraceEvents so we need to skip + // logging TraceEvents to avoid infinite looping + if (exchange instanceof TraceEvent || exchange.getProperty(TRACE_EVENT, Boolean.class) != null) { + // but we must still process to allow routing of TraceEvents to eg a JPA endpoint + super.process(exchange); + return; + } + + // okay this is a regular exchange being routed we might need to log and trace try { + // before if (shouldLogNode(node) && shouldLogExchange(exchange)) { logExchange(exchange); + traceExchange(exchange); } + + // process the exchange super.proceed(exchange); + + // after (trace out) if (tracer.isTraceOutExchanges() && shouldLogNode(node) && shouldLogExchange(exchange)) { logExchange(exchange); - } + traceExchange(exchange); + } } catch (Exception e) { if (shouldLogException(exchange)) { logException(exchange, e); @@ -87,7 +111,7 @@ } public Object format(Exchange exchange) { - return formatter.format(this, exchange); + return formatter.format(this, this.getNode(), exchange); } // Properties @@ -107,9 +131,36 @@ // Implementation methods //------------------------------------------------------------------------- protected void logExchange(Exchange exchange) { + // process the exchange that formats and logs it logger.process(exchange); } + protected void traceExchange(Exchange exchange) throws Exception { + // should we send a trace event to an optional destination? + if (traceEventProducer != null) { + // create event and add it as a property on the original exchange + TraceEvent event = new TraceEvent(exchange); + event.setNodeId(node.getId()); + event.setTimestamp(new Date()); + event.setTracedExchange(exchange); + + // create event message to send in body + TraceEventMessage msg = new TraceEventMessage(node, exchange); + event.getIn().setBody(msg); + // marker property to indicate its a tracing event being routed in case + // new Exchange instances is created during trace routing so we can check + // for this marker when interceptor also kickins in during routing of trace events + event.setProperty(TRACE_EVENT, Boolean.TRUE); + // process the trace route + try { + traceEventProducer.process(event); + } catch (Exception e) { + // log and ignore this as the original Exchange should be allowed to continue + LOG.error("Error processing TraceEvent (original Exchange will be continued): " + event, e); + } + } + } + protected void logException(Exchange exchange, Throwable throwable) { if (tracer.isTraceExceptions()) { logger.process(exchange, throwable); @@ -132,11 +183,11 @@ /** * Returns whether exchanges coming out of processors should be traced - */ + */ public boolean shouldTraceOutExchanges() { return tracer.isTraceOutExchanges(); } - + /** * Returns true if the given node should be logged in the trace list */ @@ -150,4 +201,22 @@ return true; } + @Override + protected void doStart() throws Exception { + super.doStart(); + // in case of destination then create a producer to send the TraceEvent to + if (tracer.getDestination() != null) { + traceEventProducer = tracer.getDestination().createProducer(); + ServiceHelper.startService(traceEventProducer); + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + if (traceEventProducer != null) { + ServiceHelper.stopService(traceEventProducer); + } + } + } \ No newline at end of file Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java?rev=733633&r1=733632&r2=733633&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java Sun Jan 11 22:37:54 2009 @@ -19,7 +19,7 @@ import java.util.List; import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; +import org.apache.camel.Endpoint; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultCamelContext; @@ -34,7 +34,7 @@ */ public class Tracer implements InterceptStrategy { - private TraceFormatter formatter = new TraceFormatter(); + private TraceFormatter formatter = new DefaultTraceFormatter(); private boolean enabled = true; private String logName; private LoggingLevel logLevel; @@ -42,6 +42,7 @@ private boolean traceInterceptors; private boolean traceExceptions = true; private boolean traceOutExchanges; + private Endpoint destination; /** * A helper method to return the Tracer instance for a given {...@link CamelContext} if one is enabled @@ -151,4 +152,20 @@ public boolean isTraceOutExchanges() { return traceOutExchanges; } + + public Endpoint getDestination() { + return destination; + } + + /** + * Sets an optional destination to send the traced Exchange wrapped in a {...@link TraceEvent}. + * <p/> + * Can be used to store tracing as files, in a database or whatever. The routing of the Exchange + * will happen synchronously and the original route will first continue when this destination routing + * has been compledted. + */ + public void setDestination(Endpoint destination) { + this.destination = destination; + } + } \ No newline at end of file Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java?rev=733633&view=auto ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java (added) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java Sun Jan 11 22:37:54 2009 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util; + +import org.apache.camel.Message; +import org.apache.camel.NoTypeConversionAvailableException; +import org.apache.camel.converter.stream.StreamCache; + +/** + * Some helper methods when working with {...@link org.apache.camel.Message}. + * + * @version $Revision$ + */ +public final class MessageHelper { + + /** + * Utility classes should not have a public constructor. + */ + private MessageHelper() { + } + + /** + * Extracts the given body and returns it as a String, that + * can be used for logging etc. + * <p/> + * Will handle stream based bodies wrapped in StreamCache. + * + * @param message the message with the body + * @return the body as String, can return <tt>null</null> if no body + */ + public static String extractBodyAsString(Message message) { + if (message == null) { + return null; + } + + StreamCache newBody = null; + try { + newBody = message.getBody(StreamCache.class); + if (newBody != null) { + message.setBody(newBody); + } + } catch (NoTypeConversionAvailableException ex) { + // ignore, in not of StreamCache type + } + + Object answer; + try { + answer = message.getBody(String.class); + } catch (NoTypeConversionAvailableException ex) { + answer = message.getBody(); + } + + if (newBody != null) { + // Reset the InputStreamCache + newBody.reset(); + } + + return answer != null ? answer.toString() : null; + } + + /** + * Gets the given body class type name as a String. + * <p/> + * Will skip java.lang. for the build in Java types. + * + * @param message the message with the body + * @return the body typename as String, can return <tt>null</null> if no body + */ + public static String getBodyTypeName(Message message) { + if (message == null) { + return null; + } + String answer = ObjectHelper.classCanonicalName(message.getBody()); + if (answer != null && answer.startsWith("java.lang.")) { + return answer.substring(10); + } + return answer; + } +} Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/FromEndpointTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/FromEndpointTest.java?rev=733633&r1=733632&r2=733633&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/FromEndpointTest.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/FromEndpointTest.java Sun Jan 11 22:37:54 2009 @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -7,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,13 +16,13 @@ */ package org.apache.camel.impl; +import java.util.List; + import org.apache.camel.ContextTestSupport; -import org.apache.camel.Exchange; import org.apache.camel.Endpoint; -import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; - -import java.util.List; +import org.apache.camel.component.mock.MockEndpoint; /** * @version $Revision: 1.1 $ Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/language/FileLanguageTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/language/FileLanguageTest.java?rev=733633&r1=733632&r2=733633&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/language/FileLanguageTest.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/language/FileLanguageTest.java Sun Jan 11 22:37:54 2009 @@ -26,8 +26,8 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.LanguageTestSupport; import org.apache.camel.component.file.FileComponent; -import org.apache.camel.component.file.FileExchange; import org.apache.camel.component.file.FileEndpoint; +import org.apache.camel.component.file.FileExchange; import org.apache.camel.impl.JndiRegistry; import org.apache.camel.language.simple.FileLanguage; Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorWithOutBodyTraceTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorWithOutBodyTraceTest.java?rev=733633&r1=733632&r2=733633&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorWithOutBodyTraceTest.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorWithOutBodyTraceTest.java Sun Jan 11 22:37:54 2009 @@ -16,13 +16,9 @@ */ package org.apache.camel.processor; -import org.apache.camel.ContextTestSupport; -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.interceptor.DefaultTraceFormatter; import org.apache.camel.processor.interceptor.Tracer; -import org.apache.camel.util.ExchangeHelper; public class TraceInterceptorWithOutBodyTraceTest extends TraceInterceptorTest { @@ -32,8 +28,15 @@ // START SNIPPET: tracingOutExchanges Tracer tracer = new Tracer(); tracer.setTraceOutExchanges(true); - tracer.getFormatter().setShowOutBody(true); - tracer.getFormatter().setShowOutBodyType(true); + + // we configure the default trace formatter where we can + // specify which fields we want in the output + DefaultTraceFormatter formatter = new DefaultTraceFormatter(); + formatter.setShowOutBody(true); + formatter.setShowOutBodyType(true); + + // set to use our formatter + tracer.setFormatter(formatter); getContext().addInterceptStrategy(tracer); // END SNIPPET: tracingOutExchanges Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceFormatterTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceFormatterTest.java?rev=733633&view=auto ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceFormatterTest.java (added) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceFormatterTest.java Sun Jan 11 22:37:54 2009 @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.interceptor; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.model.ProcessorType; + +/** + * @version $Revision$ + */ +public class TraceFormatterTest extends ContextTestSupport { + + private List<String> tracedBodies = new ArrayList<String>(); + + public void testSendingSomeMessagesBeingTraced() throws Exception { + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedBodiesReceived("Bye World"); + + MockEndpoint mock = getMockEndpoint("mock:traced"); + mock.expectedMessageCount(4); + + template.sendBodyAndHeader("direct:start", "Hello London", "to", "James"); + + assertMockEndpointsSatisfied(); + + // assert we received the correct bodies at the given time of interception + // and that the bodies haven't changed during the routing of the original + // exchange that changes its body over time (Hello London -> Bye World) + assertEquals("Hello London", tracedBodies.get(0)); + assertEquals("Hello World", tracedBodies.get(1)); + assertEquals("Goodday World", tracedBodies.get(2)); + assertEquals("Bye World", tracedBodies.get(3)); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + // START SNIPPET: e1 + // we create a tracer where we want to use our own formatter instead of the default one + Tracer tracer = new Tracer(); + + // use our own formatter instead of the default one + MyTraceFormatter formatter = new MyTraceFormatter(); + tracer.setFormatter(formatter); + + // and we must remeber to add the tracer to Camel + getContext().addInterceptStrategy(tracer); + // END SNIPPET: e1 + + // this is only for unit testing to use mock for assertion + tracer.setDestination(context.getEndpoint("direct:traced")); + + from("direct:start") + .process(new MyProcessor("Hello World")) + .process(new MyProcessor("Goodday World")) + .process(new MyProcessor("Bye World")) + .to("mock:result"); + + from("direct:traced") + .process(new MyTraveAssertProcessor()) + .to("mock:traced"); + } + }; + } + + class MyProcessor implements Processor { + + private String msg; + + MyProcessor(String msg) { + this.msg = msg; + } + + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody(msg); + } + } + + class MyTraveAssertProcessor implements Processor { + + public void process(Exchange exchange) throws Exception { + // take a snapshot at current time for assertion later + // after mock assertions in unit test method + TraceEventMessage event = exchange.getIn().getBody(TraceEventMessage.class); + tracedBodies.add(new String(event.getBody())); + } + } + + // START SNIPPET: e2 + // here we have out own formatter where we can create the output we want for trace logs + // as this is a test we just create a simple string with * around the body + class MyTraceFormatter implements TraceFormatter { + + public Object format(TraceInterceptor interceptor, ProcessorType node, Exchange exchange) { + return "***" + exchange.getIn().getBody(String.class) + "***"; + } + } + // END SNIPPET: e2 + +} \ No newline at end of file Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorDestinationTest.java (from r732940, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorTest.java) URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorDestinationTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorDestinationTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorTest.java&r1=732940&r2=733633&rev=733633&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorTest.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorDestinationTest.java Sun Jan 11 22:37:54 2009 @@ -14,48 +14,143 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.processor; +package org.apache.camel.processor.interceptor; + +import java.util.ArrayList; +import java.util.List; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.processor.interceptor.Tracer; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.model.LoggingLevel; /** * @version $Revision$ */ -public class TraceInterceptorTest extends ContextTestSupport { +public class TraceInterceptorDestinationTest extends ContextTestSupport { + + private List<String> tracedBodies = new ArrayList<String>(); + private List<String> tracedHeaders = new ArrayList<String>(); + + public void testSendingSomeMessagesBeingTraced() throws Exception { + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedBodiesReceived("Bye World", "Foo World"); + + MockEndpoint mock = getMockEndpoint("mock:traced"); + mock.expectedMessageCount(6); + // should be in our CSV format (defined in bottom of this class) + mock.message(0).body().regex("^direct:start;.*;.*;Hello London"); + mock.message(1).body().regex("^direct:start;.*;.*;Hello World"); + mock.message(2).body().regex("^direct:start;.*;.*;Goodday World"); + mock.message(3).body().regex("^direct:start;.*;.*;Bye World"); + mock.message(4).body().regex("^direct:foo;.*;.*;Hello Copenhagen"); + mock.message(5).body().regex("^direct:foo;.*;.*;Foo World"); - // START SNIPPET: e1 - public void testSendingSomeMessages() throws Exception { template.sendBodyAndHeader("direct:start", "Hello London", "to", "James"); - template.sendBodyAndHeader("direct:start", "This is Copenhagen calling", "from", "Claus"); + template.sendBody("direct:foo", "Hello Copenhagen"); + + assertMockEndpointsSatisfied(); + + // assert we received the correct bodies at the given time of interception + // and that the bodies haven't changed during the routing of the original + // exchange that changes its body over time (Hello London -> Bye World) + assertEquals("Hello London", tracedBodies.get(0)); + assertEquals("Hello World", tracedBodies.get(1)); + assertEquals("Goodday World", tracedBodies.get(2)); + assertEquals("Bye World", tracedBodies.get(3)); + assertEquals("Hello Copenhagen", tracedBodies.get(4)); + assertEquals("Foo World", tracedBodies.get(5)); + + // assert headers as well + assertEquals("{to=James}", tracedHeaders.get(0)); + assertEquals("{to=Hello}", tracedHeaders.get(1)); + assertEquals("{to=Goodday}", tracedHeaders.get(2)); + assertEquals("{to=Bye}", tracedHeaders.get(3)); + assertEquals("{to=Foo}", tracedHeaders.get(4)); } protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { - // add tracer as an interceptor so it will log the exchange executions at runtime - // this can aid us to understand/see how the exchanges is routed etc. - getContext().addInterceptStrategy(new Tracer()); - - from("direct:start"). - process(new Processor() { - public void process(Exchange exchange) throws Exception { - // do nothing - } - - @Override - public String toString() { - return "MyProcessor"; - } - }). - to("mock:a"). - to("mock:b"); + // START SNIPPET: e1 + // we create a tracer where we want to send TraveEvents to an endpoint + // "direct:traced" where we can do some custom processing such as storing + // it in a file or a database + Tracer tracer = new Tracer(); + tracer.setDestination(context.getEndpoint("direct:traced")); + // we disable regular trace logging in the log file. You can omit this and + // have both. + tracer.setLogLevel(LoggingLevel.OFF); + // and we must remeber to add the tracer to Camel + getContext().addInterceptStrategy(tracer); + // END SNIPPET: e1 + + from("direct:start") + .process(new MyProcessor("Hello World")) + .process(new MyProcessor("Goodday World")) + .process(new MyProcessor("Bye World")) + .to("mock:result"); + + from("direct:foo") + .process(new MyProcessor("Foo World")) + .to("mock:result"); + + from("direct:traced") + .process(new MyTraveAssertProcessor()) + .process(new MyTraceMessageProcessor()) + .to("mock:traced"); } }; } - // END SNIPPET: e1 -} + class MyProcessor implements Processor { + + private String msg; + + MyProcessor(String msg) { + this.msg = msg; + } + + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody(msg); + exchange.getIn().setHeader("to", msg.split(" ")[0]); + } + } + + class MyTraveAssertProcessor implements Processor { + + public void process(Exchange exchange) throws Exception { + TraceEvent event = (TraceEvent) exchange; + assertNotNull(event); + assertEquals(event.getExchangeId(), exchange.getExchangeId()); + assertNotNull(event.getNodeId()); + assertNotNull(event.getTimestamp()); + + // take a snapshot at current time for assertion later + // after mock assertions in unit test method + TraceEventMessage msg = exchange.getIn().getBody(TraceEventMessage.class); + tracedBodies.add(msg.getBody()); + if (msg.getHeaders() != null) { + tracedHeaders.add(msg.getHeaders()); + } + } + } + + // START SNIPPET: e2 + class MyTraceMessageProcessor implements Processor { + + public void process(Exchange exchange) throws Exception { + // here we can transform the message how we like want it + TraceEventMessage msg = exchange.getIn().getBody(TraceEventMessage.class); + + // we want to store it as a CSV with fromEndpoint;node;exchangeId;body + String s = msg.getFromEndpointUri() + ";" + msg.getNode() + ";" + msg.getExchangeId() + ";" + msg.getBody(); + + // so we replace the IN body with our CSV string + exchange.getIn().setBody(s); + } + } + // END SNIPPET: e2 +} \ No newline at end of file Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorDestinationTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorDestinationTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorDestinationTest.java ------------------------------------------------------------------------------ svn:mergeinfo = Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorTest.java?rev=733633&r1=733632&r2=733633&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorTest.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TraceInterceptorTest.java Sun Jan 11 22:37:54 2009 @@ -19,15 +19,16 @@ import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.model.ProcessorType; import org.easymock.classextension.EasyMock; public class TraceInterceptorTest extends ContextTestSupport { - private TraceFormatter formatter; + private DefaultTraceFormatter formatter; private Tracer tracer; @Override protected void setUp() throws Exception { - formatter = EasyMock.createMock(TraceFormatter.class); + formatter = EasyMock.createMock(DefaultTraceFormatter.class); tracer = new Tracer(); super.setUp(); } @@ -44,7 +45,7 @@ public void testTracerInterceptor() throws Exception { EasyMock.reset(formatter); - formatter.format(EasyMock.isA(TraceInterceptor.class), EasyMock.isA(Exchange.class)); + formatter.format(EasyMock.isA(TraceInterceptor.class), EasyMock.isA(ProcessorType.class), EasyMock.isA(Exchange.class)); EasyMock.expectLastCall().andReturn("Test").atLeastOnce(); EasyMock.replay(formatter); template.sendBody("direct:a", "<hello>world!</hello>");