Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java Tue Feb 17 15:12:52 2009 @@ -21,12 +21,14 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + import org.apache.activeblaze.impl.processor.Packet; import org.apache.activeblaze.impl.reliable.ReliableBuffer; -import org.apache.activeblaze.wire.AckData; import org.apache.activeblaze.wire.MessageType; -import org.apache.activeblaze.wire.NackData; import org.apache.activeblaze.wire.PacketData; +import org.apache.activeblaze.wire.AckData.AckDataBean; +import org.apache.activeblaze.wire.NackData.NackDataBean; +import org.apache.activeblaze.wire.PacketData.PacketDataBean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,7 +59,7 @@ void processInBound(Packet packet) throws Exception { PacketData packetData = packet.getPacketData(); - MessageType type = MessageType.valueOf(packetData.getType()); + MessageType type = packetData.getType(); if (type == MessageType.CONTROL_DATA) { if (this.replayBuffer.isEmpty()) { // send back a control message @@ -107,19 +109,18 @@ } } else if (!packet.isReplayed() && !this.replayBuffer.isEmpty()) { // request the sequence - MessageType nackType = MessageType.NACK_DATA; - NackData nack = (NackData) nackType.createMessage(); + NackDataBean nack = new NackDataBean(); this.lock.lock(); try { nack.setStartSequence(this.lastSequence + 1); nack.setEndSequence(packet.getMessageSequence() - 1); nack.setSessionId(packet.getPacketData().getSessionId()); nack.setId(this.ackSequence.incrementAndGet()); - PacketData pd = new PacketData(); + PacketDataBean pd = new PacketDataBean(); pd.setResponseRequired(false); - pd.setPayload(nack.toFramedBuffer()); - pd.setType(nackType.getNumber()); - Packet nackPacket = new Packet(pd); + pd.setPayload(nack.freeze().toUnframedBuffer()); + pd.setType(MessageType.NACK_DATA); + Packet nackPacket = new Packet(pd.freeze()); nackPacket.setTo(this.peerAddress); this.swp.sendDownStream(nackPacket); LOG.debug(this + " Sending Nack: " + nack.getStartSequence() + " , " + nack.getEndSequence()); @@ -152,16 +153,15 @@ this.lock.lock(); try { this.bufferSize = 0; - MessageType type = MessageType.ACK_DATA; - AckData ack = (AckData) type.createMessage(); + AckDataBean ack = new AckDataBean(); ack.setStartSequence(this.firstSequence); ack.setEndSequence(this.lastSequence); ack.setId(this.ackSequence.incrementAndGet()); - PacketData pd = new PacketData(); + PacketDataBean pd = new PacketDataBean(); pd.setResponseRequired(false); - pd.setPayload(ack.toFramedBuffer()); - pd.setType(type.getNumber()); - ackPacket = new Packet(pd); + pd.setPayload(ack.freeze().toUnframedBuffer()); + pd.setType(MessageType.ACK_DATA); + ackPacket = new Packet(pd.freeze()); ackPacket.setTo(this.peerAddress); this.lastAckTime = System.currentTimeMillis(); this.firstSequence = this.lastSequence;
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java Tue Feb 17 15:12:52 2009 @@ -23,14 +23,16 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + import org.apache.activeblaze.BlazeNoRouteException; import org.apache.activeblaze.impl.processor.Packet; import org.apache.activeblaze.impl.reliable.ReliableBuffer; -import org.apache.activeblaze.wire.AckData; -import org.apache.activeblaze.wire.ControlData; import org.apache.activeblaze.wire.MessageType; -import org.apache.activeblaze.wire.NackData; import org.apache.activeblaze.wire.PacketData; +import org.apache.activeblaze.wire.AckData.AckDataBuffer; +import org.apache.activeblaze.wire.ControlData.ControlDataBean; +import org.apache.activeblaze.wire.NackData.NackDataBuffer; +import org.apache.activeblaze.wire.PacketData.PacketDataBean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** @@ -66,8 +68,10 @@ * */ void processOutbound(final Packet packet) throws BlazeNoRouteException { - packet.getPacketData().setSessionId(this.sessionId); - packet.getPacketData().setMessageSequence(this.sendSequence.incrementAndGet()); + PacketDataBean bean = packet.getPacketData().copy(); + bean.setSessionId(this.sessionId); + bean.setMessageSequence(this.sendSequence.incrementAndGet()); + packet.setPacketData(bean.freeze()); this.lock.lock(); try { this.replayBuffer.addPacket(packet); @@ -92,10 +96,9 @@ Packet result = null; PacketData data = packet.getPacketData(); if (data != null) { - MessageType type = MessageType.valueOf(data.getType()); + MessageType type = data.getType(); if (type == MessageType.ACK_DATA) { - AckData ackData = (AckData) type.createMessage(); - ackData.mergeFramed(data.getPayload()); + AckDataBuffer ackData = AckDataBuffer.parseUnframed(data.getPayload()); long start = ackData.getStartSequence(); long end = ackData.getEndSequence(); if (LOG.isDebugEnabled()) { @@ -119,8 +122,7 @@ } } else if (type == MessageType.NACK_DATA) { this.lastAckTime = System.currentTimeMillis(); - NackData nackData = (NackData) type.createMessage(); - nackData.mergeFramed(data.getPayload()); + NackDataBuffer nackData = NackDataBuffer.parseUnframed(data.getPayload()); this.lastAckId = nackData.getId(); LOG.debug(this + " Got Nack = " + nackData); // lookup any missed messages @@ -151,14 +153,13 @@ Packet ackPacket = null; this.lock.lock(); try { - MessageType type = MessageType.CONTROL_DATA; - ControlData control = (ControlData) type.createMessage(); + ControlDataBean control = new ControlDataBean(); control.setLastId(this.lastAckId); - PacketData pd = new PacketData(); + PacketDataBean pd = new PacketDataBean(); pd.setResponseRequired(false); - pd.setPayload(control.toFramedBuffer()); - pd.setType(type.getNumber()); - ackPacket = new Packet(pd); + pd.setPayload(control.freeze().toUnframedBuffer()); + pd.setType(MessageType.CONTROL_DATA); + ackPacket = new Packet(pd.freeze()); ackPacket.setTo(this.peerAddress); LOG.debug(this + " Sent Control message " + control); } finally { Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java Tue Feb 17 15:12:52 2009 @@ -21,6 +21,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activeblaze.impl.processor.DefaultChainedProcessor; import org.apache.activeblaze.impl.processor.Packet; @@ -106,10 +107,20 @@ } public void doStop() throws Exception { - super.doStop(); if (this.statusTimer != null) { - this.statusTimer.cancel(); + // Make sure we shutdown the timer before shutting down the down stream + // processors to avoid the timer getting errors. + final CountDownLatch done = new CountDownLatch(1); + this.statusTimer.schedule(new TimerTask(){ + @Override + public void run() { + statusTimer.cancel(); + done.countDown(); + }}, 0); + done.await(); + this.statusTimer=null; } + super.doStop(); } void sendDownStream(Packet packet) throws Exception { Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Tue Feb 17 15:12:52 2009 @@ -258,7 +258,12 @@ } catch (InterruptedException e1) { // we've stopped } catch (Exception e) { - LOG.error("Caught an exception processing a packet: " + packet, e); + String value=""; + try { + value = packet.toString(); + } catch (Throwable ignore) { + } + LOG.error("Caught an exception processing a packet: " + value, e); stopInternal(); } } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Tue Feb 17 15:12:52 2009 @@ -22,9 +22,10 @@ import java.net.MulticastSocket; import java.net.NetworkInterface; import java.net.SocketAddress; + import org.apache.activeblaze.BlazeException; import org.apache.activeblaze.impl.processor.Packet; -import org.apache.activeblaze.wire.PacketData; +import org.apache.activeblaze.wire.PacketData.PacketDataBuffer; /** * Multicast transport @@ -72,7 +73,7 @@ DatagramPacket dp = new DatagramPacket(receiveData, receiveData.length); this.socket.receive(dp); if (dp.getLength() > 0) { - PacketData data = PacketData.parseFramed(dp.getData()); + PacketDataBuffer data = PacketDataBuffer.parseFramed(dp.getData()); SocketAddress address = dp.getSocketAddress(); Packet packet = new Packet(address, data); if (!isEnableAudit() || !this.audit.isDuplicate(packet)) { Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Tue Feb 17 15:12:52 2009 @@ -25,15 +25,18 @@ import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.util.Map; + import org.apache.activeblaze.BlazeException; import org.apache.activeblaze.BlazeNoRouteException; import org.apache.activeblaze.impl.processor.Packet; import org.apache.activeblaze.util.IOUtils; import org.apache.activeblaze.util.LRUCache; import org.apache.activeblaze.util.SendRequest; -import org.apache.activeblaze.wire.AckData; import org.apache.activeblaze.wire.MessageType; import org.apache.activeblaze.wire.PacketData; +import org.apache.activeblaze.wire.AckData.AckDataBean; +import org.apache.activeblaze.wire.PacketData.PacketDataBean; +import org.apache.activeblaze.wire.PacketData.PacketDataBuffer; import org.apache.activemq.protobuf.Buffer; /** @@ -47,7 +50,7 @@ private ByteBuffer outBuffer; - private Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>( + private Map<Buffer, SendRequest<PacketDataBuffer>> messageRequests = new LRUCache<Buffer, SendRequest<PacketDataBuffer>>( 1000); public void doInit() throws Exception { @@ -96,12 +99,11 @@ buffer.flip(); while (buffer.remaining() > 0) { InputStream stream = IOUtils.getByteBufferInputStream(buffer); - PacketData data = PacketData.parseFramed(stream); + PacketDataBuffer data = PacketDataBuffer.parseFramed(stream); stream.close(); if (data.getResponse()) { synchronized (this.messageRequests) { - SendRequest request = this.messageRequests.remove(data - .getCorrelationId()); + SendRequest<PacketDataBuffer> request = this.messageRequests.remove(data.getCorrelationId()); if (request != null) { request.put(data.getMessageId(), data); } @@ -124,12 +126,11 @@ public void downStream(Packet packet) throws Exception { ByteBuffer buffer = this.outBuffer; if (isStarted()) { - SendRequest request = null; + SendRequest<PacketDataBuffer> request = null; if (packet.isResponseRequired()) { synchronized (this.messageRequests) { - request = new SendRequest(); - this.messageRequests.put(packet.getPacketData() - .getMessageId(), request); + request = new SendRequest<PacketDataBuffer>(); + this.messageRequests.put(packet.getPacketData().getMessageId(), request); } } synchronized (buffer) { @@ -159,18 +160,17 @@ } private Packet createAckPacket(PacketData data) { - MessageType type = MessageType.ACK_DATA; - AckData ackData = (AckData) type.createMessage(); + AckDataBean ackData = new AckDataBean(); ackData.setSessionId(data.getSessionId()); ackData.setStartSequence(data.getMessageSequence()); ackData.setEndSequence(data.getMessageSequence()); - PacketData pd = new PacketData(); + PacketDataBean pd = new PacketDataBean(); pd.setResponseRequired(false); pd.setCorrelationId(data.getMessageId()); pd.setResponse(true); - pd.setPayload(ackData.toFramedBuffer()); - pd.setType(type.getNumber()); - Packet packet = new Packet(pd); + pd.setPayload(ackData.freeze().toUnframedBuffer()); + pd.setType(MessageType.ACK_DATA); + Packet packet = new Packet(pd.freeze()); return packet; } } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java Tue Feb 17 15:12:52 2009 @@ -31,6 +31,7 @@ import org.apache.activeblaze.jms.BlazeJmsExceptionSupport; import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType; import org.apache.activeblaze.wire.BlazeData; +import org.apache.activeblaze.wire.BlazeData.BlazeDataBean; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.BufferInputStream; import org.apache.activemq.protobuf.BufferOutputStream; @@ -116,7 +117,7 @@ if (this.dataOut != null) { this.dataOut.close(); Buffer bs = this.bytesOut.toBuffer(); - getContent().setPayload(bs); + ((BlazeDataBean)getContent()).setPayload(bs); this.bytesOut = null; this.dataOut = null; } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java Tue Feb 17 15:12:52 2009 @@ -21,15 +21,17 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Map; + import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageFormatException; import javax.jms.MessageNotWriteableException; + import org.apache.activeblaze.BlazeException; import org.apache.activeblaze.BlazeRuntimeException; -import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType; -import org.apache.activeblaze.wire.BlazeData; -import org.apache.activeblaze.wire.MapData; +import org.apache.activeblaze.wire.BlazeData.BlazeDataBean; +import org.apache.activeblaze.wire.MapData.MapDataBean; +import org.apache.activeblaze.wire.MapData.MapDataBuffer; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.InvalidProtocolBufferException; @@ -111,13 +113,12 @@ public void storeContent() { super.storeContent(); if (getContent() != null && !this.map.isEmpty()) { - MapData mapData = new MapData(); + MapDataBean mapData = new MapDataBean(); for (Map.Entry<String, Object> entry : this.map.entrySet()) { marshallMap(mapData, entry.getKey().toString(), entry.getValue()); } - Buffer payload = mapData.toFramedBuffer(); - BlazeData data = getContent(); - data.setPayload(payload); + Buffer payload = mapData.freeze().toUnframedBuffer(); + ((BlazeDataBean)getContent()).setPayload(payload); } } @@ -129,7 +130,7 @@ protected void loadContent() { if (getContent() != null && this.map.isEmpty()) { try { - MapData mapData = MapData.parseFramed(getContent().getPayload()); + MapDataBuffer mapData = MapDataBuffer.parseUnframed(getContent().getPayload()); this.map = unmarshall(mapData); } catch (InvalidProtocolBufferException e) { throw new BlazeRuntimeException(e); Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java Tue Feb 17 15:12:52 2009 @@ -17,18 +17,20 @@ package org.apache.activeblaze.jms.message; import java.util.Enumeration; + import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageFormatException; + import org.apache.activeblaze.BlazeException; import org.apache.activeblaze.BlazeMessage; import org.apache.activeblaze.BlazeMessageFormatException; import org.apache.activeblaze.BlazeRuntimeException; import org.apache.activeblaze.jms.BlazeJmsDestination; import org.apache.activeblaze.util.Callback; -import org.apache.activeblaze.wire.BlazeData; +import org.apache.activeblaze.wire.BlazeData.BlazeDataBean; /** * Implementation of a Jms Message @@ -99,7 +101,7 @@ * @see javax.jms.Message#clearBody() */ public void clearBody() throws JMSException { - BlazeData data = getContent(); + BlazeDataBean data = (BlazeDataBean) getContent(); if (data != null) { data.clearPayload(); } @@ -109,7 +111,7 @@ * @see javax.jms.Message#clearProperties() */ public void clearProperties() { - BlazeData data = getContent(); + BlazeDataBean data = (BlazeDataBean) getContent(); if (data != null) { data.clearMapData(); } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java Tue Feb 17 15:12:52 2009 @@ -22,13 +22,15 @@ import java.io.InputStream; import java.io.ObjectOutputStream; import java.io.Serializable; + import javax.jms.JMSException; import javax.jms.ObjectMessage; + import org.apache.activeblaze.BlazeException; import org.apache.activeblaze.BlazeRuntimeException; import org.apache.activeblaze.jms.BlazeJmsExceptionSupport; -import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType; import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream; +import org.apache.activeblaze.wire.BlazeData.BlazeDataBean; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.BufferInputStream; import org.apache.activemq.protobuf.BufferOutputStream; @@ -98,7 +100,8 @@ objOut.reset(); objOut.close(); payload = os.toBuffer(); - getContent().setPayload(payload); + BlazeDataBean data = (BlazeDataBean) getContent(); + data.setPayload(payload); } catch (IOException ioe) { throw new RuntimeException(ioe.getMessage(), ioe); } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java Tue Feb 17 15:12:52 2009 @@ -32,6 +32,7 @@ import org.apache.activeblaze.jms.BlazeJmsExceptionSupport; import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType; import org.apache.activeblaze.wire.BlazeData; +import org.apache.activeblaze.wire.BlazeData.BlazeDataBean; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.BufferInputStream; @@ -135,7 +136,8 @@ try { this.dataOut.close(); Buffer buffer = new Buffer(bytesOut.toByteArray()); - getContent().setPayload(buffer); + BlazeDataBean data = (BlazeDataBean) getContent(); + data.setPayload(buffer); this.bytesOut = null; this.dataOut = null; } catch (IOException ioe) { Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java Tue Feb 17 15:12:52 2009 @@ -24,6 +24,7 @@ import org.apache.activeblaze.BlazeException; import org.apache.activeblaze.BlazeRuntimeException; import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType; +import org.apache.activeblaze.wire.BlazeData.BlazeDataBean; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.BufferInputStream; import org.apache.activemq.protobuf.BufferOutputStream; @@ -74,7 +75,8 @@ BufferOutputStream os = new BufferOutputStream(this.text != null ? this.text.length() : 10); DataOutputStream dataOut = new DataOutputStream(os); MarshallingSupport.writeUTF8(dataOut, this.text); - getContent().setPayload(os.toBuffer()); + BlazeDataBean data = (BlazeDataBean) getContent(); + data.setPayload(os.toBuffer()); dataOut.close(); } catch (IOException e) { throw new BlazeRuntimeException(e); Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java Tue Feb 17 15:12:52 2009 @@ -17,8 +17,8 @@ package org.apache.activeblaze.util; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.protobuf.Message; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -26,13 +26,13 @@ * state on a request * */ -public class SendRequest { +public class SendRequest<R> { private static final Log LOG = LogFactory.getLog(SendRequest.class); private final AtomicBoolean done = new AtomicBoolean(); - private Message<?> response; + private R response; private RequestCallback callback; - public Object get(long timeout) { + public R get(long timeout) { synchronized (this.done) { if (this.done.get() == false && this.response == null) { try { @@ -45,7 +45,7 @@ return this.response; } - public void put(Buffer id,Message<?> response) { + public void put(Buffer id, R response) { this.response = response; cancel(); RequestCallback callback = this.callback; Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original) +++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Tue Feb 17 15:12:52 2009 @@ -19,8 +19,6 @@ option java_multiple_files = true; option optimize_for = SPEED; - - // We make use of the wonky comment style bellow because the following options // are not valid for protoc, but they are valid for the ActiveMQ proto compiler. // In the ActiveMQ proto compiler, comments terminate with the pipe character: | @@ -33,226 +31,209 @@ ACK_DATA = 3; NACK_DATA = 4; STATE_DATA = 5; - CONTROL_DATA =6; + CONTROL_DATA = 6; +} + +message PacketData { + optional bool responseRequired = 1; + optional bool reliable = 2; + optional bool response = 3; + optional bool replayed = 4; + optional MessageType type =5; + optional bytes producerId = 6; + optional int32 sessionId = 7; + optional int64 messageSequence = 8; + optional int32 numberOfParts= 9; + optional int32 partNumber= 10; + optional bytes payload= 11; + optional bytes messageId =12; + optional bytes correlationId = 13; +} + +message BlazeData { + optional bool persistent = 1; + optional int32 priority = 2; + optional int32 redeliveryCounter = 3; + optional int32 type =4; + optional int64 timestamp = 5; + optional int64 expiration = 6; + optional bytes messageId = 7; + optional bytes correlationId = 8; + optional bytes fromId =9; + optional bytes messageType = 10; + optional bytes payload = 11; + optional DestinationData destinationData = 12; + optional DestinationData replyToData = 13; + optional MapData mapData = 14; + optional bytes payload = 15; +} + +message AckData { + optional int64 id =1; + optional int64 startSequence =2; + optional int64 endSequence =3; + optional int64 sessionId = 4; +} + +message NackData { + optional int64 id =1; + optional int64 startSequence =2; + optional int64 endSequence =3; + optional int64 sessionId = 4; +} + +message ControlData { + optional int64 lastId =1; //last ack or nack id +} + +message DestinationData { + optional bytes name =1; + optional bool topic =2; + optional bool temporary=3; +} + +message SubscriptionData { + optional bool durable = 1; + optional bool noLocal = 2; + optional int32 weight = 3; + optional string channelName = 4; + optional string subscriberName = 5; + optional string selector = 6; + optional DestinationData destinationData = 7; +} + +message MemberData { + optional string id = 1; + optional string name = 2; + optional int64 startTime = 3; + optional int64 timeStamp = 4; + optional bytes inetAddress = 5; + optional int32 port = 6; + // a higher weight means this will be the master + optional int64 masterWeight = 7; + // if both weights are the same - the refined weight can be used + optional int64 refinedWeight = 8; + optional bool subscriptionsChanged = 9; + optional bool observer = 10; + optional bool lockedMaster = 11; + repeated bytes groups = 12; + repeated SubscriptionData subscriptionData = 13; +} + +message StateKeyData { + optional MemberData member =1; + optional string key = 2; + optional bool locked = 3; + optional bool removeOnExit = 4; + optional bool releaseLockOnExit = 5; + optional int64 expiration = 6; + optional int64 lockExpiration = 7; +} + +enum StateType { + INSERT = 1; + DELETE = 2; + SYNC = 3; +} + +message StateData { + optional StateKeyData keyData = 1; + optional bytes value =2; + optional bytes oldvalue =3; + optional bool mapUpdate = 4; + optional bool mapWrite = 5; + optional bool expired = 6; + optional bool lockExpired = 7; + optional bool lockUpdate = 8; + optional bool lockWrite = 9; + optional bool error = 10; + optional StateType stateType = 11; +} + +enum ElectionType { + ELECTION = 0; + ANSWER = 1; + MASTER = 2; +} + +message ElectionMessage { + optional MemberData member = 1; + optional ElectionType electionType = 2; } - message PacketData { - optional bool responseRequired = 1; - optional bool reliable = 2; - optional bool response = 3; - optional bool replayed = 4; - optional int32 type =5; - optional bytes producerId = 6; - optional int32 sessionId = 7; - optional int64 messageSequence = 8; - optional int32 numberOfParts= 9; - optional int32 partNumber= 10; - optional bytes payload= 11; - optional bytes messageId =12; - optional bytes correlationId = 13; - - } - - message BlazeData { - //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType"; - //| option java_type_method = "MessageType"; - optional bool persistent = 1; - optional int32 priority = 2; - optional int32 redeliveryCounter = 3; - optional int32 type =4; - optional int64 timestamp = 5; - optional int64 expiration = 6; - optional bytes messageId = 7; - optional bytes correlationId = 8; - optional bytes fromId =9; - optional bytes messageType = 10; - optional bytes payload = 11; - optional DestinationData destinationData = 12; - optional DestinationData replyToData = 13; - optional MapData mapData = 14; - optional bytes payload = 15; - - } - - message AckData { - //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType"; - //| option java_type_method = "MessageType"; - optional int64 id =1; - optional int64 startSequence =2; - optional int64 endSequence =3; - optional int64 sessionId = 4; - } - - - - message NackData { - //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType"; - //| option java_type_method = "MessageType"; - optional int64 id =1; - optional int64 startSequence =2; - optional int64 endSequence =3; - optional int64 sessionId = 4; - } - - message ControlData { - //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType"; - //| option java_type_method = "MessageType"; - optional int64 lastId =1; //last ack or nack id - } - - message DestinationData { - optional bytes name =1; - optional bool topic =2; - optional bool temporary=3; - } - - message SubscriptionData { - optional bool durable = 1; - optional bool noLocal = 2; - optional int32 weight = 3; - optional string channelName = 4; - optional string subscriberName = 5; - optional string selector = 6; - optional DestinationData destinationData = 7; - } - - message MemberData { - //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType"; - //| option java_type_method = "MessageType"; - optional string id = 1; - optional string name = 2; - optional int64 startTime = 3; - optional int64 timeStamp = 4; - optional bytes inetAddress = 5; - optional int32 port = 6; - //a higher weight means this will be the master - optional int64 masterWeight = 7; - //if both weights are the same - the refined - //weight can be used - optional int64 refinedWeight = 8; - optional bool subscriptionsChanged = 9; - optional bool observer = 10; - optional bool lockedMaster = 11; - repeated bytes groups = 12; - repeated SubscriptionData subscriptionData = 13; - } - - message StateKeyData { - optional MemberData member =1; - optional string key = 2; - optional bool locked = 3; - optional bool removeOnExit = 4; - optional bool releaseLockOnExit = 5; - optional int64 expiration = 6; - optional int64 lockExpiration = 7; - } - enum StateType { - INSERT = 1; - DELETE = 2; - SYNC = 3; - } - message StateData { - //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType"; - //| option java_type_method = "MessageType"; - optional StateKeyData keyData = 1; - optional bytes value =2; - optional bytes oldvalue =3; - optional bool mapUpdate = 4; - optional bool mapWrite = 5; - optional bool expired = 6; - optional bool lockExpired = 7; - optional bool lockUpdate = 8; - optional bool lockWrite = 9; - optional bool error = 10; - optional StateType stateType = 11; - - } - - enum ElectionType { - ELECTION = 0; - ANSWER = 1; - MASTER = 2; - } - message ElectionMessage { - //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType"; - //| option java_type_method = "MessageType"; - optional MemberData member = 1; - optional ElectionType electionType = 2; - } +/////////////////////////////////////////////////////////////////////// +// Properties / MapData +/////////////////////////////////////////////////////////////////////// + +message StringType { + optional string name = 1; + optional string value = 2; +} + +message BoolType { + optional string name = 1; + optional bool value = 2; +} + +message ByteType { + optional string name = 1; + optional int32 value = 2; +} + +message ShortType { + optional string name = 1; + optional int32 value = 2; +} + +message IntType { + optional string name = 1; + optional int32 value = 2; +} + +message LongType { + optional string name = 1; + optional int64 value = 2; +} + +message FloatType { + optional string name = 1; + optional float value = 2; +} - // Properties - message StringType { - optional string name = 1; - optional string value = 2; - } - - message BoolType { - optional string name = 1; - optional bool value = 2; - } - - message ByteType { - optional string name = 1; - optional int32 value = 2; - } - - message ShortType { - optional string name = 1; - optional int32 value = 2; - } - - message IntType { - optional string name = 1; - optional int32 value = 2; - } - - message LongType { - optional string name = 1; - optional int64 value = 2; - } - - message FloatType { - optional string name = 1; - optional float value = 2; - } - - message DoubleType { - optional string name = 1; - optional double value = 2; - } - - message CharType { - optional string name = 1; - optional string value = 2; - } - - message BytesType { - optional string name = 1; - optional bytes value = 2; - } - - message BufferType { - optional string name = 1; - optional bytes value = 2; - } +message DoubleType { + optional string name = 1; + optional double value = 2; +} + +message CharType { + optional string name = 1; + optional string value = 2; +} +message BytesType { + optional string name = 1; + optional bytes value = 2; +} +message BufferType { + optional string name = 1; + optional bytes value = 2; +} - message MapData { - optional string name=1[default = "DEFAULT"]; - repeated StringType stringType = 2; - repeated IntType intType = 3; - repeated BoolType boolType = 4; - repeated LongType longType = 5; - repeated DoubleType doubleType = 6; - repeated FloatType floatType = 7; - repeated ShortType shortType = 8; - repeated ByteType byteType = 9; - repeated CharType charType = 10; - repeated BytesType bytesType = 11; - repeated MapData mapType = 12; - repeated BufferType bufferType = 13; - - } +message MapData { + optional string name=1[default = "DEFAULT"]; + repeated StringType stringType = 2; + repeated IntType intType = 3; + repeated BoolType boolType = 4; + repeated LongType longType = 5; + repeated DoubleType doubleType = 6; + repeated FloatType floatType = 7; + repeated ShortType shortType = 8; + repeated ByteType byteType = 9; + repeated CharType charType = 10; + repeated BytesType bytesType = 11; + repeated MapData mapType = 12; + repeated BufferType bufferType = 13; +} \ No newline at end of file Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java Tue Feb 17 15:12:52 2009 @@ -355,7 +355,7 @@ Object value = state1.put("foo", "blob"); assertNull(value); value = state1.put("foo", "blah"); - assertEquals(value, "blob"); + assertEquals("blob", value); } public void testRemove() throws Exception { Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java Tue Feb 17 15:12:52 2009 @@ -119,7 +119,7 @@ reply.shutDown(); } - public void testSendRequestString() throws Exception { + public void testSendRequestString() throws Exception { String destination = "/test/foo"; final int number = 10; final List<BlazeMessage> requests = new ArrayList<BlazeMessage>(); Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java Tue Feb 17 15:12:52 2009 @@ -17,8 +17,10 @@ package org.apache.activeblaze.impl.processor; import java.util.concurrent.atomic.AtomicBoolean; + import junit.framework.TestCase; -import org.apache.activeblaze.wire.PacketData; + +import org.apache.activeblaze.wire.PacketData.PacketDataBean; /** * Test some basics in ChainedProcessor @@ -72,7 +74,7 @@ A.setEnd(D); A.setEnd(target); A.start(); - Packet p = new Packet(new PacketData()); + Packet p = new Packet(new PacketDataBean().freeze()); D.downStream(p); assertTrue(test.get()); } @@ -93,7 +95,7 @@ A.setEnd(C); A.setEnd(D); A.start(); - Packet p = new Packet(new PacketData()); + Packet p = new Packet(new PacketDataBean().freeze()); D.upStream(p); assertTrue(test.get()); } Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java Tue Feb 17 15:12:52 2009 @@ -4,7 +4,8 @@ package org.apache.activeblaze.impl.processor; import junit.framework.TestCase; -import org.apache.activeblaze.wire.PacketData; + +import org.apache.activeblaze.wire.PacketData.PacketDataBean; import org.apache.activemq.protobuf.Buffer; /** @@ -13,10 +14,9 @@ */ public class CompressionProcessorTest extends TestCase { public void testProcessor() throws Exception { - Packet packet = new Packet(new PacketData()); byte[] d1 = new byte[1024]; Buffer payload = new Buffer(d1); - packet.getPacketData().setPayload(payload); + Packet packet = new Packet(new PacketDataBean().setPayload(payload).freeze()); TerminatedChainedProcessor test = new TerminatedChainedProcessor(); CompressionProcessor proc = new CompressionProcessor(); proc.setPrev(test); @@ -30,7 +30,7 @@ d2[i] = (byte) i; } payload = new Buffer(d2); - packet.getPacketData().setPayload(payload); + packet = new Packet(new PacketDataBean().setPayload(payload).freeze()); proc.downStream(packet.clone()); Packet result = test.getResult(); assertTrue(CompressionProcessor.isCompressed(result.getPacketData().getPayload())); Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java Tue Feb 17 15:12:52 2009 @@ -18,8 +18,10 @@ import java.util.ArrayList; import java.util.List; + import junit.framework.TestCase; -import org.apache.activeblaze.wire.PacketData; + +import org.apache.activeblaze.wire.PacketData.PacketDataBean; import org.apache.activemq.protobuf.Buffer; /** @@ -28,13 +30,13 @@ */ public class FragmentationProcessorTest extends TestCase { public void testProcessor() throws Exception { - Packet packet = new Packet(new PacketData()); byte[] testData = new byte[1024 * 32]; for (int i = 0; i < testData.length; i++) { testData[i] = (byte) i; } Buffer payload = new Buffer(testData); - packet.getPacketData().setPayload(payload); + Packet packet = new Packet(new PacketDataBean().setPayload(payload).freeze()); + TerminatedChainedProcessor test = new TerminatedChainedProcessor(); FragmentationProcessor proc = new FragmentationProcessor(); proc.setPrev(test); Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java Tue Feb 17 15:12:52 2009 @@ -17,7 +17,8 @@ package org.apache.activeblaze.impl.processor; import junit.framework.TestCase; -import org.apache.activeblaze.wire.PacketData; + +import org.apache.activeblaze.wire.PacketData.PacketDataBean; import org.apache.activemq.protobuf.Buffer; @@ -35,18 +36,18 @@ PacketAudit audit = new PacketAudit(); audit.start(); for (long i =0; i< audit.getMaxAuditDepth();i++) { - PacketData data = new PacketData(); + PacketDataBean data = new PacketDataBean(); data.setProducerId(new Buffer("fred")); data.setMessageSequence(i); - Packet packet = new Packet(data); + Packet packet = new Packet(data.freeze()); assertFalse(audit.isDuplicate(packet)); } for (long i =0; i< audit.getMaxAuditDepth();i++) { - PacketData data = new PacketData(); + PacketDataBean data = new PacketDataBean(); data.setProducerId(new Buffer("fred")); data.setMessageSequence(i); - Packet packet = new Packet(data); + Packet packet = new Packet(data.freeze()); assertTrue("Testing " + i,audit.isDuplicate(packet)); } } Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java Tue Feb 17 15:12:52 2009 @@ -23,12 +23,14 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + import junit.framework.TestCase; + import org.apache.activeblaze.impl.processor.DefaultChainedProcessor; import org.apache.activeblaze.impl.processor.Packet; import org.apache.activeblaze.impl.transport.UdpTransport; import org.apache.activeblaze.util.IdGenerator; -import org.apache.activeblaze.wire.PacketData; +import org.apache.activeblaze.wire.PacketData.PacketDataBean; import org.apache.activemq.protobuf.Buffer; /** @@ -93,7 +95,7 @@ receiver.setLocalURI(this.receiverURI); this.consumer.start(); for (int i = 0; i < number; i++) { - Packet packet = createPacket(this.to); + Packet packet = createPacket(this.to, false); this.producer.downStream(packet); } latch.await(10, TimeUnit.SECONDS); @@ -129,8 +131,7 @@ receiver.setLocalURI(this.receiverURI); this.consumer.start(); for (int i = 0; i < number; i++) { - Packet packet = createPacket(this.to); - packet.getPacketData().setResponseRequired(true); + Packet packet = createPacket(this.to, true); this.producer.downStream(packet); } latch.await(10, TimeUnit.SECONDS); @@ -146,12 +147,15 @@ } } - protected Packet createPacket(SocketAddress to) throws Exception { - PacketData data = new PacketData(); + protected Packet createPacket(SocketAddress to, boolean responseRequried) throws Exception { + PacketDataBean data = new PacketDataBean(); data.setMessageId(new Buffer(this.idGenerator.generateId())); Buffer payload = new Buffer(new byte[1024]); data.setPayload(payload); - Packet packet = new Packet(data); + if( responseRequried ) { + data.setResponseRequired(true); + } + Packet packet = new Packet(data.freeze()); packet.setTo(to); return packet; } Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java Tue Feb 17 15:12:52 2009 @@ -18,11 +18,14 @@ import java.net.InetSocketAddress; import java.net.URI; + +import junit.framework.TestCase; + import org.apache.activeblaze.impl.processor.Packet; import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor; -import org.apache.activeblaze.wire.PacketData; +import org.apache.activeblaze.wire.MessageType; +import org.apache.activeblaze.wire.PacketData.PacketDataBean; import org.apache.activemq.protobuf.Buffer; -import junit.framework.TestCase; /** * Test Multicast Transport @@ -44,14 +47,14 @@ receiver.start(); String payload = "test String"; Buffer duff = new Buffer("duff"); - PacketData packetData = new PacketData(); - packetData.setType(1); + PacketDataBean packetData = new PacketDataBean(); + packetData.setType(MessageType.MEMBER_DATA); packetData.setMessageId(new Buffer("foo")); packetData.setProducerId(duff); packetData.setSessionId(1); packetData.setMessageSequence(0); packetData.setPayload(new Buffer(payload)); - Packet packet = new Packet(packetData); + Packet packet = new Packet(packetData.freeze()); packet.setTo(to); sender.downStream(packet); Thread.sleep(500); Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java?rev=745113&r1=745112&r2=745113&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java Tue Feb 17 15:12:52 2009 @@ -16,14 +16,14 @@ */ package org.apache.activeblaze.impl.transport; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.URI; + import junit.framework.TestCase; import org.apache.activeblaze.impl.processor.Packet; import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor; -import org.apache.activeblaze.wire.PacketData; +import org.apache.activeblaze.wire.MessageType; +import org.apache.activeblaze.wire.PacketData.PacketDataBean; import org.apache.activemq.protobuf.Buffer; @@ -52,14 +52,14 @@ String payload = "test String"; Buffer duff = new Buffer("duff"); - PacketData packetData = new PacketData(); - packetData.setType(1); + PacketDataBean packetData = new PacketDataBean(); + packetData.setType(MessageType.MEMBER_DATA); packetData.setMessageId(new Buffer("foo")); packetData.setProducerId(duff); packetData.setSessionId(1); packetData.setMessageSequence(0); packetData.setPayload(new Buffer(payload)); - Packet packet = new Packet(receiverURI.getHost(),receiverURI.getPort(),packetData); + Packet packet = new Packet(receiverURI.getHost(),receiverURI.getPort(),packetData.freeze()); sender.downStream(packet);
