Author: chirino
Date: Tue Aug 22 23:58:41 2006
New Revision: 433948
URL: http://svn.apache.org/viewvc?rev=433948&view=rev
Log:
Added a udp and multicast broker tracing plugins. If enabled on a broker they
can be used to monitor the broker without much network overhead.
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
(with props)
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java?rev=433948&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java
Tue Aug 22 23:58:41 2006
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.MulticastSocket;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * A Broker interceptor which allows you to trace all operations to a
Multicast socket.
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: 427613 $
+ */
+public class MulticastTraceBrokerPlugin extends UDPTraceBrokerPlugin {
+
+ private int timeToLive = 1;
+
+ public MulticastTraceBrokerPlugin() {
+ try {
+ destination = new URI("multicast://224.1.2.3:61616");
+ } catch (URISyntaxException wontHappen) {
+ }
+ }
+
+ protected DatagramSocket createSocket() throws IOException {
+ MulticastSocket s = new MulticastSocket();
+ s.setSendBufferSize(maxTraceDatagramSize);
+ s.setBroadcast(broadcast);
+ s.setLoopbackMode(true);
+ s.setTimeToLive(timeToLive);
+ return s;
+ }
+
+ public int getTimeToLive() {
+ return timeToLive;
+ }
+
+ public void setTimeToLive(int timeToLive) {
+ this.timeToLive = timeToLive;
+ }
+
+}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java?rev=433948&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
Tue Aug 22 23:58:41 2006
@@ -0,0 +1,192 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import org.apache.activeio.command.WireFormat;
+import org.apache.activeio.command.WireFormatFactory;
+import org.apache.activeio.packet.ByteSequence;
+import org.apache.activeio.util.ByteArrayOutputStream;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.JournalTrace;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+
+/**
+ * A Broker interceptor which allows you to trace all operations to a UDP
socket.
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: 427613 $
+ */
+public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
+
+ protected WireFormat wireFormat;
+ protected WireFormatFactory wireFormatFactory;
+ protected int maxTraceDatagramSize = 1024*4;
+ protected URI destination;
+ protected DatagramSocket socket;
+
+ protected BrokerId brokerId;
+ protected SocketAddress address;
+ protected boolean broadcast;
+
+ public UDPTraceBrokerPlugin() {
+ try {
+ destination = new URI("udp://127.0.0.1:61616");
+ } catch (URISyntaxException wontHappen) {
+ }
+ }
+
+ public void start() throws Exception {
+ super.start();
+ if( getWireFormat() == null )
+ throw new IllegalArgumentException("Wireformat must be
specifed.");
+ if( address == null ) {
+ address = createSocketAddress(destination);
+ }
+ socket = createSocket();
+
+ brokerId = super.getBrokerId();
+ trace(new JournalTrace("START"));
+ }
+
+ protected DatagramSocket createSocket() throws IOException {
+ DatagramSocket s = new DatagramSocket();
+ s.setSendBufferSize(maxTraceDatagramSize);
+ s.setBroadcast(broadcast);
+ return s;
+ }
+
+ public void stop() throws Exception {
+ trace(new JournalTrace("STOP"));
+ socket.close();
+ super.stop();
+ }
+
+ private void trace(DataStructure command) throws IOException {
+ ByteArrayOutputStream baos = new
ByteArrayOutputStream(maxTraceDatagramSize);
+ DataOutputStream out = new DataOutputStream(baos);
+ wireFormat.marshal(brokerId, out);
+ wireFormat.marshal(command, out);
+ out.close();
+ ByteSequence sequence = baos.toByteSequence();
+ DatagramPacket datagram = new DatagramPacket(
sequence.getData(), sequence.getOffset(), sequence.getLength(), address);
+ socket.send(datagram);
+ }
+
+ public void send(ConnectionContext context, Message messageSend) throws
Exception {
+ trace(messageSend);
+ super.send(context, messageSend);
+ }
+
+ public void acknowledge(ConnectionContext context, MessageAck ack) throws
Exception {
+ trace(ack);
+ super.acknowledge(context, ack);
+ }
+
+ public WireFormat getWireFormat() {
+ if( wireFormat == null ) {
+ wireFormat = createWireFormat();
+ }
+ return wireFormat;
+ }
+
+ protected WireFormat createWireFormat() {
+ return getWireFormatFactory().createWireFormat();
+ }
+
+ public void setWireFormat(WireFormat wireFormat) {
+ this.wireFormat = wireFormat;
+ }
+
+ public WireFormatFactory getWireFormatFactory() {
+ if( wireFormatFactory == null ) {
+ wireFormatFactory = createWireFormatFactory();
+ }
+ return wireFormatFactory;
+ }
+
+ protected OpenWireFormatFactory createWireFormatFactory() {
+ OpenWireFormatFactory wf = new OpenWireFormatFactory();
+ wf.setCacheEnabled(false);
+ wf.setVersion(1);
+ wf.setTightEncodingEnabled(true);
+ wf.setSizePrefixDisabled(true);
+ return wf;
+ }
+
+ public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
+ this.wireFormatFactory = wireFormatFactory;
+ }
+
+
+ protected SocketAddress createSocketAddress(URI location) throws
UnknownHostException {
+ InetAddress a = InetAddress.getByName(location.getHost());
+ int port = location.getPort();
+ return new InetSocketAddress(a, port);
+ }
+
+ public URI getDestination() {
+ return destination;
+ }
+
+ public void setDestination(URI destination) {
+ this.destination = destination;
+ }
+
+ public int getMaxTraceDatagramSize() {
+ return maxTraceDatagramSize;
+ }
+
+ public void setMaxTraceDatagramSize(int maxTraceDatagramSize) {
+ this.maxTraceDatagramSize = maxTraceDatagramSize;
+ }
+
+ public boolean isBroadcast() {
+ return broadcast;
+ }
+
+ public void setBroadcast(boolean broadcast) {
+ this.broadcast = broadcast;
+ }
+
+ public SocketAddress getAddress() {
+ return address;
+ }
+
+ public void setAddress(SocketAddress address) {
+ this.address = address;
+ }
+
+
+}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java?rev=433948&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
Tue Aug 22 23:58:41 2006
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.util.IntrospectionSupport;
+
+/**
+ *
+ * @openwire:marshaller code="53"
+ * @version $Revision: 1.6 $
+ */
+public class JournalTrace implements DataStructure {
+
+ public static final byte DATA_STRUCTURE_TYPE=CommandTypes.JOURNAL_TRACE;
+
+ private String message;
+
+ public JournalTrace() {
+
+ }
+ public JournalTrace(String message) {
+ this.message = message;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getMessage() {
+ return message;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+
+ public String toString() {
+ return IntrospectionSupport.toString(this, JournalTrace.class);
+ }
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
------------------------------------------------------------------------------
svn:executable = *