Repository: karaf-decanter Updated Branches: refs/heads/master 4e53beb01 -> a3d3ea29a
http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/a3d3ea29/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java ---------------------------------------------------------------------- diff --git a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java new file mode 100644 index 0000000..5c0769d --- /dev/null +++ b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java @@ -0,0 +1,346 @@ +package org.apache.karaf.decanter.collector.camel; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.EventObject; +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.RouteNode; +import org.apache.camel.spi.TracedRouteNodes; +import org.apache.camel.support.EventNotifierSupport; +import org.apache.camel.util.MessageHelper; +import org.apache.camel.management.event.AbstractExchangeEvent; +import org.apache.camel.management.event.CamelContextResumeFailureEvent; +import org.apache.camel.management.event.CamelContextResumedEvent; +import org.apache.camel.management.event.CamelContextResumingEvent; +import org.apache.camel.management.event.CamelContextStartedEvent; +import org.apache.camel.management.event.CamelContextStartingEvent; +import org.apache.camel.management.event.CamelContextStartupFailureEvent; +import org.apache.camel.management.event.CamelContextStopFailureEvent; +import org.apache.camel.management.event.CamelContextStoppedEvent; +import org.apache.camel.management.event.CamelContextStoppingEvent; +import org.apache.camel.management.event.CamelContextSuspendedEvent; +import org.apache.camel.management.event.CamelContextSuspendingEvent; +import org.apache.camel.management.event.ExchangeCompletedEvent; +import org.apache.camel.management.event.ExchangeCreatedEvent; +import org.apache.camel.management.event.ExchangeFailureHandledEvent; +import org.apache.camel.management.event.ExchangeRedeliveryEvent; +import org.apache.camel.management.event.ExchangeSendingEvent; +import org.apache.camel.management.event.ExchangeSentEvent; +import org.apache.camel.management.event.RouteAddedEvent; +import org.apache.camel.management.event.RouteRemovedEvent; +import org.apache.camel.management.event.RouteStartedEvent; +import org.apache.camel.management.event.RouteStoppedEvent; +import org.apache.camel.management.event.ServiceStartupFailureEvent; +import org.apache.camel.management.event.ServiceStopFailureEvent; + +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DecanterEventNotifier extends EventNotifierSupport { + + private static final Logger LOG = LoggerFactory.getLogger(DecanterEventNotifier.class.getName()); + + private EventAdmin eventAdmin; + private String camelContextMatcher = ".*"; + private String routeMatcher = ".*"; + private DecanterCamelEventExtender extender = null; + + public EventAdmin getEventAdmin() { + return eventAdmin; + } + + public void setEventAdmin(EventAdmin eventAdmin) { + this.eventAdmin = eventAdmin; + } + + public void setCamelContextMatcher(String camelContextMatcher) { + this.camelContextMatcher = camelContextMatcher; + } + + public void setRouteMatcher(String routeMatcher) { + this.routeMatcher = routeMatcher; + } + + public void setExtender(DecanterCamelEventExtender extender) { + this.extender = extender; + } + + @Override + public boolean isEnabled(EventObject eventObject) { + if (eventObject != null) { + if (eventObject instanceof AbstractExchangeEvent) { + AbstractExchangeEvent event = (AbstractExchangeEvent) eventObject; + if (event.getExchange().getFromRouteId() != null) { + return (event.getExchange().getFromRouteId().matches(routeMatcher) && event.getExchange().getContext().getName().matches(camelContextMatcher)); + } else { + return (event.getExchange().getContext().getName().matches(camelContextMatcher)); + } + } + if (eventObject instanceof CamelContextResumedEvent) { + return ((CamelContextResumedEvent) eventObject).getContext().getName().matches(camelContextMatcher); + } + if (eventObject instanceof CamelContextResumeFailureEvent) { + return ((CamelContextResumeFailureEvent) eventObject).getContext().getName().matches(camelContextMatcher); + } + if (eventObject instanceof CamelContextResumingEvent) { + return ((CamelContextResumingEvent) eventObject).getContext().getName().matches(camelContextMatcher); + } + if (eventObject instanceof CamelContextStartedEvent) { + return ((CamelContextStartedEvent) eventObject).getContext().getName().matches(camelContextMatcher); + } + if (eventObject instanceof CamelContextStartingEvent) { + return ((CamelContextStartingEvent) eventObject).getContext().getName().matches(camelContextMatcher); + } + if (eventObject instanceof CamelContextStartupFailureEvent) { + return ((CamelContextStartupFailureEvent) eventObject).getContext().getName().matches(camelContextMatcher); + } + if (eventObject instanceof CamelContextStopFailureEvent) { + return ((CamelContextStopFailureEvent) eventObject).getContext().getName().matches(camelContextMatcher); + } + if (eventObject instanceof CamelContextStoppedEvent) { + return ((CamelContextStoppedEvent) eventObject).getContext().getName().matches(camelContextMatcher); + } + if (eventObject instanceof CamelContextStoppingEvent) { + return ((CamelContextStoppingEvent) eventObject).getContext().getName().matches(camelContextMatcher); + } + if (eventObject instanceof RouteAddedEvent) { + return ((RouteAddedEvent) eventObject).getRoute().getRouteContext().getCamelContext().getName().matches(camelContextMatcher) + && ((RouteAddedEvent) eventObject).getRoute().getId().matches(routeMatcher); + } + if (eventObject instanceof RouteRemovedEvent) { + return ((RouteRemovedEvent) eventObject).getRoute().getRouteContext().getCamelContext().getName().matches(camelContextMatcher) + && ((RouteRemovedEvent) eventObject).getRoute().getId().matches(routeMatcher); + } + if (eventObject instanceof RouteStartedEvent) { + return ((RouteStartedEvent) eventObject).getRoute().getRouteContext().getCamelContext().getName().matches(camelContextMatcher) + && ((RouteStartedEvent) eventObject).getRoute().getId().matches(routeMatcher); + } + if (eventObject instanceof RouteStoppedEvent) { + return ((RouteStoppedEvent) eventObject).getRoute().getRouteContext().getCamelContext().getName().matches(camelContextMatcher) + && ((RouteStoppedEvent) eventObject).getRoute().getId().matches(routeMatcher); + } + } + return false; + } + + public void notify(EventObject event) throws Exception { + try { + Map<String, Object> eventMap = createEvent(event, + event instanceof AbstractExchangeEvent ? ((AbstractExchangeEvent) event) + .getExchange() : null); + boolean post = false; + if (event instanceof ExchangeSentEvent && !isIgnoreExchangeEvents() && !isIgnoreExchangeSentEvents()) { + ExchangeSentEvent sent = (ExchangeSentEvent) event; + eventMap.put("sentToEndpointUri", sent.getEndpoint() + .getEndpointUri()); + eventMap.put("sentTimeTaken", sent.getTimeTaken()); + post = true; + } + if (event instanceof ExchangeSendingEvent && !isIgnoreExchangeEvents() && !isIgnoreExchangeSendingEvents()) { + ExchangeSendingEvent sending = (ExchangeSendingEvent) event; + eventMap.put("sendingToEndpointUri", sending.getEndpoint().getEndpointUri()); + post = true; + } + if (event instanceof ExchangeFailureHandledEvent && !isIgnoreExchangeEvents() && !isIgnoreExchangeFailedEvents()) { + ExchangeFailureHandledEvent failHandled = (ExchangeFailureHandledEvent) event; + eventMap.put("failureIsHandled", failHandled.isHandled()); + eventMap.put("failureIsDeadLetterChannel", failHandled.isDeadLetterChannel()); + eventMap.put("failureHandler", failHandled.getFailureHandler() == null ? "null" + : failHandled.getFailureHandler().getClass().getName()); + post = true; + } + if (event instanceof ExchangeRedeliveryEvent && !isIgnoreExchangeEvents() && !isIgnoreExchangeRedeliveryEvents()) { + ExchangeRedeliveryEvent redelivery = (ExchangeRedeliveryEvent) event; + eventMap.put("redeliveryAttempt", redelivery.getAttempt()); + post = true; + } + if (event instanceof RouteStartedEvent && !isIgnoreRouteEvents()) { + RouteStartedEvent route = (RouteStartedEvent) event; + eventMap.put("routeId", route.getRoute().getId()); + eventMap.put("camelContextName", route.getRoute().getRouteContext().getCamelContext().getName()); + post = true; + } + if (event instanceof RouteAddedEvent && !isIgnoreRouteEvents()) { + RouteAddedEvent route = (RouteAddedEvent) event; + eventMap.put("routeId", route.getRoute().getId()); + eventMap.put("camelContextName", route.getRoute().getRouteContext().getCamelContext().getName()); + post = true; + } + if (event instanceof RouteRemovedEvent && !isIgnoreRouteEvents()) { + RouteRemovedEvent route = (RouteRemovedEvent) event; + eventMap.put("routeId", route.getRoute().getId()); + eventMap.put("camelContextName", route.getRoute().getRouteContext().getCamelContext().getName()); + post = true; + } + if (event instanceof RouteStoppedEvent && !isIgnoreRouteEvents()) { + RouteStoppedEvent route = (RouteStoppedEvent) event; + eventMap.put("routeId", route.getRoute().getId()); + eventMap.put("camelContextName", route.getRoute().getRouteContext().getCamelContext().getName()); + post = true; + } + if (event instanceof ServiceStartupFailureEvent && !isIgnoreServiceEvents()) { + ServiceStartupFailureEvent service = (ServiceStartupFailureEvent) event; + eventMap.put("serviceName", service.getService().getClass().getName()); + eventMap.put("camelContextName", service.getContext().getName()); + eventMap.put("cause", service.getCause().toString()); + post = true; + } + if (event instanceof ServiceStopFailureEvent && !isIgnoreServiceEvents()) { + ServiceStopFailureEvent service = (ServiceStopFailureEvent) event; + eventMap.put("serviceName", service.getService().getClass().getName()); + eventMap.put("camelContextName", service.getContext().getName()); + eventMap.put("cause", service.getCause().toString()); + post = true; + } + if (event instanceof CamelContextResumedEvent && !isIgnoreCamelContextEvents()) { + CamelContextResumedEvent context = (CamelContextResumedEvent) event; + eventMap.put("camelContextName", context.getContext().getName()); + post = true; + } + if (event instanceof CamelContextResumeFailureEvent && !isIgnoreCamelContextEvents()) { + CamelContextResumeFailureEvent context = (CamelContextResumeFailureEvent) event; + eventMap.put("camelContextName", context.getContext().getName()); + eventMap.put("cause", context.getCause().toString()); + post = true; + } + if (event instanceof CamelContextResumingEvent && !isIgnoreCamelContextEvents()) { + CamelContextResumingEvent context = (CamelContextResumingEvent) event; + eventMap.put("camelContextName", context.getContext().getName()); + post = true; + } + if (event instanceof CamelContextStartedEvent && !isIgnoreCamelContextEvents()) { + CamelContextStartedEvent context = (CamelContextStartedEvent) event; + eventMap.put("camelContextName", context.getContext().getName()); + post = true; + } + if (event instanceof CamelContextStartingEvent && !isIgnoreCamelContextEvents()) { + CamelContextStartingEvent context = (CamelContextStartingEvent) event; + eventMap.put("camelContextName", context.getContext().getName()); + post = true; + } + if (event instanceof CamelContextStartupFailureEvent && !isIgnoreCamelContextEvents()) { + CamelContextStartupFailureEvent context = (CamelContextStartupFailureEvent) event; + eventMap.put("camelContextName", context.getContext().getName()); + eventMap.put("cause", context.getCause().toString()); + post = true; + } + if (event instanceof CamelContextStopFailureEvent && !isIgnoreCamelContextEvents()) { + CamelContextStopFailureEvent context = (CamelContextStopFailureEvent) event; + eventMap.put("camelContextName", context.getContext().getName()); + eventMap.put("cause", context.getCause().toString()); + post = true; + } + if (event instanceof CamelContextStoppedEvent && !isIgnoreCamelContextEvents()) { + CamelContextStoppedEvent context = (CamelContextStoppedEvent) event; + eventMap.put("camelContextName", context.getContext().getName()); + post = true; + } + if (event instanceof CamelContextStoppingEvent && !isIgnoreCamelContextEvents()) { + CamelContextStoppingEvent context = (CamelContextStoppingEvent) event; + eventMap.put("camelContextName", context.getContext().getName()); + post = true; + } + if (event instanceof CamelContextSuspendedEvent && !isIgnoreCamelContextEvents()) { + CamelContextSuspendedEvent context = (CamelContextSuspendedEvent) event; + eventMap.put("camelContextName", context.getContext().getName()); + post = true; + } + if (event instanceof CamelContextSuspendingEvent && !isIgnoreCamelContextEvents()) { + CamelContextSuspendingEvent context = (CamelContextSuspendingEvent) event; + eventMap.put("camelContextName", context.getContext().getName()); + post = true; + } + if (event instanceof ExchangeCompletedEvent && !isIgnoreExchangeEvents() && !isIgnoreExchangeCompletedEvent()) { + post = true; + } + if (event instanceof ExchangeCreatedEvent && !isIgnoreExchangeEvents() && !isIgnoreExchangeCreatedEvent()) { + post = true; + } + if (post) { + eventAdmin.postEvent(new Event("decanter/collect/camel/event", eventMap)); + } + } catch (Exception ex) { + LOG.warn("Failed to handle event", ex); + } + } + + protected void doStart() throws Exception { + } + + protected void doStop() throws Exception { + } + + private Map<String, Object> createEvent(EventObject event, Exchange exchange) throws UnknownHostException { + HashMap<String, Object> data = new HashMap<String, Object>(); + data.put("eventType", event.getClass().getName()); + data.put("type", "camelEvent"); + data.put("karafName", System.getProperty("karaf.name")); + data.put("hostAddress", InetAddress.getLocalHost().getHostAddress()); + data.put("hostName", InetAddress.getLocalHost().getHostName()); + data.put("timestamp", System.currentTimeMillis()); + + if (exchange == null) { + return data; + } + data.put("fromEndpointUri", exchange.getFromEndpoint() != null ? exchange.getFromEndpoint().getEndpointUri() : null); + data.put("previousNode", extractFromNode(exchange)); + data.put("toNode", extractToNode(exchange)); + data.put("exchangeId", exchange.getExchangeId()); + data.put("routeId", exchange.getFromRouteId()); + data.put("camelContextName", exchange.getContext().getName()); + data.put("shortExchangeId", extractShortExchangeId(exchange)); + data.put("exchangePattern", exchange.getPattern().toString()); + data.put("properties", exchange.getProperties()); + data.put("inHeaders", exchange.getIn().getHeaders()); + data.put("inBody", MessageHelper.extractBodyAsString(exchange.getIn())); + data.put("inBodyType", MessageHelper.getBodyTypeName(exchange.getIn())); + if (exchange.hasOut()) { + data.put("outHeaders", exchange.getOut().getHeaders()); + data.put("outBody", MessageHelper.extractBodyAsString(exchange.getOut())); + data.put("outBodyType", MessageHelper.getBodyTypeName(exchange.getOut())); + } + data.put("causedByException", extractCausedByException(exchange)); + if (extender != null) { + extender.extend(data, exchange); + } + return data; + } + + private static String extractShortExchangeId(Exchange exchange) { + return exchange.getExchangeId().substring(exchange.getExchangeId().indexOf("/") + 1); + } + + private static String extractFromNode(Exchange exchange) { + if (exchange.getUnitOfWork() == null) { + return null; + } + TracedRouteNodes traced = exchange.getUnitOfWork().getTracedRouteNodes(); + RouteNode last = traced.getSecondLastNode(); + return last != null ? last.getLabel(exchange) : null; + } + + private static String extractToNode(Exchange exchange) { + if (exchange.getUnitOfWork() == null) { + return null; + } + TracedRouteNodes traced = exchange.getUnitOfWork().getTracedRouteNodes(); + RouteNode last = traced.getLastNode(); + return last != null ? last.getLabel(exchange) : null; + } + + private static String extractCausedByException(Exchange exchange) { + Throwable cause = exchange.getException(); + if (cause == null) { + cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class); + } + return (cause != null) ? cause.toString() : null; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/a3d3ea29/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterTraceEventHandler.java ---------------------------------------------------------------------- diff --git a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterTraceEventHandler.java b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterTraceEventHandler.java new file mode 100644 index 0000000..a1602ee --- /dev/null +++ b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterTraceEventHandler.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.decanter.collector.camel; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.RouteNode; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.processor.interceptor.TraceEventHandler; +import org.apache.camel.processor.interceptor.TraceInterceptor; +import org.apache.camel.spi.TracedRouteNodes; +import org.apache.camel.util.MessageHelper; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; + +import java.net.InetAddress; +import java.util.HashMap; + +public class DecanterTraceEventHandler implements TraceEventHandler { + + private EventAdmin eventAdmin; + private DecanterCamelEventExtender extender = null; + + public DecanterTraceEventHandler() { + } + + public EventAdmin getEventAdmin() { + return eventAdmin; + } + + public void setExtender(DecanterCamelEventExtender extender) { + this.extender = extender; + } + + public void setEventAdmin(EventAdmin eventAdmin) { + this.eventAdmin = eventAdmin; + } + + @Override + public void traceExchange(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception { + HashMap<String, Object> data = new HashMap<>(); + data.put("type", "camelTracer"); + data.put("karafName", System.getProperty("karaf.name")); + data.put("hostAddress", InetAddress.getLocalHost().getHostAddress()); + data.put("hostName", InetAddress.getLocalHost().getHostName()); + data.put("nodeId", node.getId()); + data.put("timestamp", System.currentTimeMillis()); + data.put("fromEndpointUri", exchange.getFromEndpoint() != null ? exchange.getFromEndpoint().getEndpointUri() : null); + data.put("previousNode", extractFromNode(exchange)); + data.put("toNode", extractToNode(exchange)); + data.put("exchangeId", exchange.getExchangeId()); + data.put("routeId", exchange.getFromRouteId()); + data.put("camelContextName", exchange.getContext().getName()); + data.put("shortExchangeId", extractShortExchangeId(exchange)); + data.put("exchangePattern", exchange.getPattern().toString()); + for (String property : exchange.getProperties().keySet()) { + if (property.startsWith("decanter.")) { + data.put(property.substring("decanter.".length()), exchange.getProperties().get(property)); + } + } + data.put("properties", exchange.getProperties().isEmpty() ? null : exchange.getProperties()); + for (String header : exchange.getIn().getHeaders().keySet()) { + if (header.startsWith("decanter.")) { + data.put(header.substring("decanter.".length()), exchange.getIn().getHeader(header)); + } + } + data.put("inHeaders", exchange.getIn().getHeaders().isEmpty() ? null : exchange.getIn().getHeaders()); + data.put("inBody", MessageHelper.extractBodyAsString(exchange.getIn())); + data.put("inBodyType", MessageHelper.getBodyTypeName(exchange.getIn())); + if (exchange.hasOut()) { + data.put("outHeaders", exchange.getOut().getHeaders().isEmpty() ? null : exchange.getOut().getHeaders()); + data.put("outBody", MessageHelper.extractBodyAsString(exchange.getOut())); + data.put("outBodyType", MessageHelper.getBodyTypeName(exchange.getOut())); + } + data.put("causedByException", extractCausedByException(exchange)); + if (extender != null) { + extender.extend(data, exchange); + } + Event event = new Event("decanter/collect/camel/tracer", data); + eventAdmin.postEvent(event); + } + + @Override + public Object traceExchangeIn(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception { + traceExchange(node, target, traceInterceptor, exchange); + return null; + } + + @Override + public void traceExchangeOut(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange, Object traceState) throws Exception { + traceExchange(node, target, traceInterceptor, exchange); + } + + private static String extractShortExchangeId(Exchange exchange) { + return exchange.getExchangeId().substring(exchange.getExchangeId().indexOf("/") + 1); + } + + private static String extractFromNode(Exchange exchange) { + if (exchange.getUnitOfWork() == null) { + return null; + } + TracedRouteNodes traced = exchange.getUnitOfWork().getTracedRouteNodes(); + RouteNode last = traced.getSecondLastNode(); + return last != null ? last.getLabel(exchange) : null; + } + + private static String extractToNode(Exchange exchange) { + if (exchange.getUnitOfWork() == null) { + return null; + } + TracedRouteNodes traced = exchange.getUnitOfWork().getTracedRouteNodes(); + RouteNode last = traced.getLastNode(); + return last != null ? last.getLabel(exchange) : null; + } + + private static String extractCausedByException(Exchange exchange) { + Throwable cause = exchange.getException(); + if (cause == null) { + cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class); + } + return (cause != null) ? cause.toString() : null; + } + +} http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/a3d3ea29/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java ---------------------------------------------------------------------- diff --git a/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java b/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java new file mode 100644 index 0000000..0a04e4e --- /dev/null +++ b/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.decanter.collector.camel; + +import org.apache.camel.Exchange; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.management.event.*; +import org.junit.Assert; +import org.junit.Test; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class DecanterEventNotifierTest { + + @Test + public void testEventNotifier() throws Exception { + DispatcherMock eventAdmin = new DispatcherMock(); + DecanterEventNotifier notifier = new DecanterEventNotifier(); + notifier.setEventAdmin(eventAdmin); + + RouteBuilder builder = new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("test-route").to("log:foo"); + } + }; + + DefaultCamelContext camelContext = new DefaultCamelContext(); + camelContext.setName("test-context"); + camelContext.addRoutes(builder); + camelContext.getManagementStrategy().addEventNotifier(notifier); + camelContext.start(); + + ProducerTemplate producerTemplate = camelContext.createProducerTemplate(); + producerTemplate.sendBodyAndHeader("direct:start", "TEST", "foo", "bar"); + + Assert.assertEquals(10, eventAdmin.getPostEvents().size()); + + Event camelContextStartingEvent = eventAdmin.getPostEvents().get(0); + Assert.assertEquals("test-context", camelContextStartingEvent.getProperty("camelContextName")); + Assert.assertEquals(CamelContextStartingEvent.class.getName(), camelContextStartingEvent.getProperty("eventType")); + Assert.assertEquals("camelEvent", camelContextStartingEvent.getProperty("type")); + + Event routeAddedEvent = eventAdmin.getPostEvents().get(1); + Assert.assertEquals("test-context", routeAddedEvent.getProperty("camelContextName")); + Assert.assertEquals("test-route", routeAddedEvent.getProperty("routeId")); + Assert.assertEquals(RouteAddedEvent.class.getName(), routeAddedEvent.getProperty("eventType")); + Assert.assertEquals("camelEvent", routeAddedEvent.getProperty("type")); + + Event routeStartedEvent = eventAdmin.getPostEvents().get(2); + Assert.assertEquals("test-context", routeStartedEvent.getProperty("camelContextName")); + Assert.assertEquals("test-route", routeStartedEvent.getProperty("routeId")); + Assert.assertEquals(RouteStartedEvent.class.getName(), routeStartedEvent.getProperty("eventType")); + Assert.assertEquals("camelEvent", routeStartedEvent.getProperty("type")); + + Event camelContextStartedEvent = eventAdmin.getPostEvents().get(3); + Assert.assertEquals("test-context", camelContextStartedEvent.getProperty("camelContextName")); + Assert.assertEquals(CamelContextStartedEvent.class.getName(), camelContextStartedEvent.getProperty("eventType")); + Assert.assertEquals("camelEvent", camelContextStartedEvent.getProperty("type")); + + Event exchangeSendingEvent = eventAdmin.getPostEvents().get(4); + Assert.assertEquals("test-context", exchangeSendingEvent.getProperty("camelContextName")); + Assert.assertEquals(ExchangeSendingEvent.class.getName(), exchangeSendingEvent.getProperty("eventType")); + Assert.assertEquals("camelEvent", exchangeSendingEvent.getProperty("type")); + Assert.assertEquals("InOnly", exchangeSendingEvent.getProperty("exchangePattern")); + Assert.assertEquals("direct://start", exchangeSendingEvent.getProperty("sendingToEndpointUri")); + Assert.assertEquals("TEST", exchangeSendingEvent.getProperty("inBody")); + + Event exchangeCreatedEvent = eventAdmin.getPostEvents().get(5); + Assert.assertEquals("test-context", exchangeCreatedEvent.getProperty("camelContextName")); + Assert.assertEquals(ExchangeCreatedEvent.class.getName(), exchangeCreatedEvent.getProperty("eventType")); + Assert.assertEquals("camelEvent", exchangeCreatedEvent.getProperty("type")); + Assert.assertEquals("InOnly", exchangeCreatedEvent.getProperty("exchangePattern")); + Assert.assertEquals("TEST", exchangeCreatedEvent.getProperty("inBody")); + } + + @Test + public void testCamelContextFilter() throws Exception { + DispatcherMock eventAdmin = new DispatcherMock(); + DecanterEventNotifier notifier = new DecanterEventNotifier(); + notifier.setEventAdmin(eventAdmin); + notifier.setCamelContextMatcher("foo"); + + RouteBuilder builder = new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("test-route").to("log:foo"); + } + }; + + DefaultCamelContext camelContext = new DefaultCamelContext(); + camelContext.setName("test-context"); + camelContext.addRoutes(builder); + camelContext.getManagementStrategy().addEventNotifier(notifier); + camelContext.start(); + + ProducerTemplate producerTemplate = camelContext.createProducerTemplate(); + producerTemplate.sendBodyAndHeader("direct:start", "TEST", "foo", "bar"); + + Assert.assertEquals(0, eventAdmin.getPostEvents().size()); + } + + @Test + public void testRouteIdFilter() throws Exception { + DispatcherMock eventAdmin = new DispatcherMock(); + DecanterEventNotifier notifier = new DecanterEventNotifier(); + notifier.setEventAdmin(eventAdmin); + notifier.setCamelContextMatcher(".*"); + notifier.setRouteMatcher("foo"); + + RouteBuilder builder = new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("test-route").to("log:foo"); + } + }; + + DefaultCamelContext camelContext = new DefaultCamelContext(); + camelContext.setName("test-context"); + camelContext.addRoutes(builder); + camelContext.getManagementStrategy().addEventNotifier(notifier); + camelContext.start(); + + ProducerTemplate producerTemplate = camelContext.createProducerTemplate(); + producerTemplate.sendBodyAndHeader("direct:start", "TEST", "foo", "bar"); + + Assert.assertEquals(4, eventAdmin.getPostEvents().size()); + } + + @Test + public void testIgnoredEvents() throws Exception { + DispatcherMock eventAdmin = new DispatcherMock(); + DecanterEventNotifier notifier = new DecanterEventNotifier(); + notifier.setEventAdmin(eventAdmin); + notifier.setIgnoreCamelContextEvents(true); + + RouteBuilder builder = new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("test-route").to("log:foo"); + } + }; + + DefaultCamelContext camelContext = new DefaultCamelContext(); + camelContext.setName("test-context"); + camelContext.addRoutes(builder); + camelContext.getManagementStrategy().addEventNotifier(notifier); + camelContext.start(); + + ProducerTemplate producerTemplate = camelContext.createProducerTemplate(); + producerTemplate.sendBodyAndHeader("direct:start", "TEST", "foo", "bar"); + + Assert.assertEquals(8, eventAdmin.getPostEvents().size()); + } + + @Test + public void testExtender() throws Exception { + DispatcherMock eventAdmin = new DispatcherMock(); + DecanterEventNotifier notifier = new DecanterEventNotifier(); + notifier.setIgnoreCamelContextEvents(true); + notifier.setIgnoreRouteEvents(true); + notifier.setEventAdmin(eventAdmin); + notifier.setExtender(new TestExtender()); + + RouteBuilder builder = new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("test-route").to("log:foo"); + } + }; + + DefaultCamelContext camelContext = new DefaultCamelContext(); + camelContext.setName("test-context"); + camelContext.addRoutes(builder); + camelContext.getManagementStrategy().addEventNotifier(notifier); + camelContext.start(); + + ProducerTemplate producerTemplate = camelContext.createProducerTemplate(); + producerTemplate.sendBodyAndHeader("direct:start", "TEST", "foo", "bar"); + + Assert.assertEquals(6, eventAdmin.getPostEvents().size()); + + Assert.assertEquals("test", eventAdmin.getPostEvents().get(0).getProperty("extender-test")); + } + + private class DispatcherMock implements EventAdmin { + + private List<Event> postEvents = new ArrayList<>(); + private List<Event> sendEvents = new ArrayList<>(); + + @Override + public void postEvent(Event event) { + postEvents.add(event); + } + + @Override + public void sendEvent(Event event) { + System.out.println("SEND EVENT"); + sendEvents.add(event); + } + + public List<Event> getPostEvents() { + return postEvents; + } + + public List<Event> getSendEvents() { + return sendEvents; + } + } + + private class TestExtender implements DecanterCamelEventExtender { + + @Override + public void extend(Map<String, Object> decanterData, Exchange camelExchange) { + decanterData.put("extender-test", "test"); + } + + } + +} http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/a3d3ea29/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterTraceEventHandlerTest.java ---------------------------------------------------------------------- diff --git a/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterTraceEventHandlerTest.java b/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterTraceEventHandlerTest.java new file mode 100644 index 0000000..7d8aa6f --- /dev/null +++ b/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterTraceEventHandlerTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.decanter.collector.camel; + +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.processor.interceptor.Tracer; +import org.apache.karaf.decanter.marshaller.json.JsonMarshaller; +import org.junit.Assert; +import org.junit.Test; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; + +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonReader; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class DecanterTraceEventHandlerTest { + + @Test + public void testTracer() throws Exception { + JsonMarshaller marshaller = new JsonMarshaller(); + + DispatcherMock eventAdmin = new DispatcherMock(); + + RouteBuilder builder = new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("test-route").to("log:foo"); + } + }; + DefaultCamelContext camelContext = new DefaultCamelContext(); + camelContext.setName("test-context"); + camelContext.addRoutes(builder); + Tracer tracer = new Tracer(); + tracer.setEnabled(true); + tracer.setTraceOutExchanges(true); + tracer.setLogLevel(LoggingLevel.OFF); + DecanterTraceEventHandler handler = new DecanterTraceEventHandler(); + handler.setEventAdmin(eventAdmin); + tracer.addTraceHandler(handler); + camelContext.setTracing(true); + camelContext.setDefaultTracer(tracer); + camelContext.start(); + + ProducerTemplate template = camelContext.createProducerTemplate(); + template.sendBodyAndHeader("direct:start", "TEST", "header", "test"); + + for (Event event : eventAdmin.getPostEvents()) { + String jsonString = marshaller.marshal(event); + JsonReader jsonReader = Json.createReader(new StringReader(jsonString)); + JsonObject rootObject = jsonReader.readObject(); + Assert.assertEquals("InOnly", rootObject.getString("exchangePattern")); + Assert.assertEquals("camelTracer", rootObject.getString("type")); + Assert.assertEquals("log://foo", rootObject.getString("toNode")); + Assert.assertEquals("test-route", rootObject.getString("routeId")); + Assert.assertEquals("test-context", rootObject.getString("camelContextName")); + JsonObject headersObject = rootObject.getJsonObject("inHeaders"); + Assert.assertEquals("test", headersObject.getString("header")); + Assert.assertEquals("direct://start", rootObject.getString("fromEndpointUri")); + Assert.assertEquals("TEST", rootObject.getString("inBody")); + JsonObject propertiesObject = rootObject.getJsonObject("properties"); + Assert.assertEquals("log://foo", propertiesObject.getString("CamelToEndpoint")); + Assert.assertEquals("String", rootObject.getString("inBodyType")); + Assert.assertEquals("decanter/collect/camel/tracer", rootObject.getString("event_topics")); + System.out.println(marshaller.marshal(event)); + } + } + + @Test + public void testTracerWithExtender() throws Exception { + DispatcherMock eventAdmin = new DispatcherMock(); + + TestExtender extender = new TestExtender(); + + RouteBuilder builder = new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("test-route").to("log:foo"); + } + }; + DefaultCamelContext camelContext = new DefaultCamelContext(); + camelContext.setName("test-context"); + camelContext.addRoutes(builder); + Tracer tracer = new Tracer(); + tracer.setEnabled(true); + tracer.setTraceOutExchanges(true); + tracer.setLogLevel(LoggingLevel.OFF); + DecanterTraceEventHandler handler = new DecanterTraceEventHandler(); + handler.setExtender(extender); + handler.setEventAdmin(eventAdmin); + tracer.addTraceHandler(handler); + camelContext.setTracing(true); + camelContext.setDefaultTracer(tracer); + camelContext.start(); + + ProducerTemplate template = camelContext.createProducerTemplate(); + template.sendBodyAndHeader("direct:start", "TEST", "header", "test"); + + Assert.assertEquals(2, eventAdmin.getPostEvents().size()); + + Assert.assertEquals("test", eventAdmin.getPostEvents().get(0).getProperty("extender-test")); + Assert.assertEquals("test", eventAdmin.getPostEvents().get(1).getProperty("extender-test")); + } + + private class DispatcherMock implements EventAdmin { + + private List<Event> postEvents = new ArrayList<>(); + private List<Event> sendEvents = new ArrayList<>(); + + @Override + public void postEvent(Event event) { + postEvents.add(event); + } + + @Override + public void sendEvent(Event event) { + System.out.println("SEND EVENT"); + sendEvents.add(event); + } + + public List<Event> getPostEvents() { + return postEvents; + } + + public List<Event> getSendEvents() { + return sendEvents; + } + } + + private class TestExtender implements DecanterCamelEventExtender { + + @Override + public void extend(Map<String, Object> decanterData, Exchange camelExchange) { + decanterData.put("extender-test", "test"); + } + + } + +} http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/a3d3ea29/collector/camel/src/test/resources/sample.xml ---------------------------------------------------------------------- diff --git a/collector/camel/src/test/resources/sample.xml b/collector/camel/src/test/resources/sample.xml new file mode 100644 index 0000000..935e614 --- /dev/null +++ b/collector/camel/src/test/resources/sample.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8"?> +<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"> + + <reference id="eventAdmin" interface="org.osgi.service.event.EventAdmin"/> + + <bean id="traceHandler" class="org.apache.karaf.decanter.collector.camel.DecanterTraceEventHandler"> + <property name="eventAdmin" ref="eventAdmin"/> + </bean> + + <bean id="tracer" class="org.apache.camel.processor.interceptor.Tracer"> + <property name="traceHandler" ref="traceHandler"/> + <property name="enabled" value="true"/> + <property name="traceOutExchanges" value="true"/> + <property name="logLevel" value="OFF"/> + </bean> + + <camelContext trace="true" xmlns="http://camel.apache.org/schema/blueprint"> + <route id="test"> + <from uri="timer:fire?period=10000"/> + <setBody><constant>Hello World</constant></setBody> + <to uri="log:test"/> + </route> + </camelContext> + +</blueprint> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/a3d3ea29/collector/pom.xml ---------------------------------------------------------------------- diff --git a/collector/pom.xml b/collector/pom.xml index a4a40d5..505e1e1 100644 --- a/collector/pom.xml +++ b/collector/pom.xml @@ -37,7 +37,7 @@ <module>jmx</module> <module>log</module> <module>file</module> - <module>camel-tracer</module> + <module>camel</module> <module>system</module> <module>log4j-socket</module> <module>rest</module> http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/a3d3ea29/manual/src/main/asciidoc/user-guide/collectors.adoc ---------------------------------------------------------------------- diff --git a/manual/src/main/asciidoc/user-guide/collectors.adoc b/manual/src/main/asciidoc/user-guide/collectors.adoc index 7dcb32b..c236409 100644 --- a/manual/src/main/asciidoc/user-guide/collectors.adoc +++ b/manual/src/main/asciidoc/user-guide/collectors.adoc @@ -249,7 +249,7 @@ The ActiveMQ JMX collector is just a special configuration of the JMX collector. The `decanter-collector-activemq` feature installs the default JMX collector, with the specific ActiveMQ JMX configuration: ---- -karaf@root()> feature:install decanter-collector-activemq +karaf@root()> feature:install decanter-collector-jmx-activemq ---- This feature installs the same collector as the `decanter-collector-jmx`, but also add the @@ -287,10 +287,10 @@ This configuration actually contains a filter to retrieve only the ActiveMQ JMX The Camel JMX collector is just a special configuration of the JMX collector. -The `decanter-collector-camel` feature installs the default JMX collector, with the specific Camel JMX configuration: +The `decanter-collector-jmx-camel` feature installs the default JMX collector, with the specific Camel JMX configuration: ---- -karaf@root()> feature:install decanter-collector-camel +karaf@root()> feature:install decanter-collector-jmx-camel ---- This feature installs the same collector as the `decanter-collector-jmx`, but also add the @@ -324,17 +324,19 @@ object.name=org.apache.camel:context=*,type=routes,name=* This configuration actually contains a filter to retrieve only the Camel Routes JMX MBeans. -==== Camel Tracer +==== Camel Tracer & Notifier + +Decanter provides a Camel Tracer Handler that you can set on a Camel Tracer. It also provides a Camel Event Notifier. -The Camel Tracer provides a Camel Tracer Handler that you can set on a Camel Tracer. +===== Camel Tracer If you enable the tracer on a Camel route, all tracer events (exchanges on each step of the route) are send to the appenders. -The `decanter-collector-camel-tracer` feature provides the Camel Tracer Handler: +The `decanter-collector-camel` feature provides the Camel Tracer Handler: ---- -karaf@root()> feature:install decanter-collector-camel-tracer +karaf@root()> feature:install decanter-collector-camel ---- Now, you can use the Decanter Camel Tracer Handler in a tracer that you can use in routes. @@ -370,6 +372,25 @@ in the Camel Tracer: </blueprint> ---- +You can extend the Decanter event with any property using a custom `DecanterCamelEventExtender`: + +---- +public interface DecanterCamelEventExtender { + + void extend(Map<String, Object> decanterData, Exchange camelExchange); + +} +---- + +You can inject your extender using `setExtender(myExtender)` on the `DecanterTraceEventHandler`. Decanter will automatically +call your extender to populate extra properties. + +===== Camel Event Notifier + +Decanter also provides `DecanterEventNotifier` implementing a Camel event notifier: http://camel.apache.org/eventnotifier-to-log-details-about-all-sent-exchanges.html + +It's very similar to the Decanter Camel Tracer. You can control the camel contexts and routes to which you want to trap event. + ==== System The system collector is a polled collector (periodically executed by the Decanter Scheduler). http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/a3d3ea29/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ab91837..0babd1f 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ <baseline.skip>true</baseline.skip> <activemq.version>5.13.3</activemq.version> - <camel.version>2.13.2</camel.version> + <camel.version>2.16.2</camel.version> <cassandra.version>2.2.4</cassandra.version> <cassandra.driver.version>2.2.0-rc1</cassandra.driver.version> <elasticsearch1.version>1.7.4</elasticsearch1.version>
