Author: chirino
Date: Thu Nov 13 13:40:16 2008
New Revision: 713835
URL: http://svn.apache.org/viewvc?rev=713835&view=rev
Log:
Updating to work with the latest API changes done to the activemq-protobuf stuff
Added:
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/PBMesssagesTest.java
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=713835&r1=713834&r2=713835&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
Thu Nov 13 13:40:16 2008
@@ -31,7 +31,6 @@
import java.util.zip.Checksum;
import org.apache.activemq.Service;
-import org.apache.activemq.protobuf.ByteString;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
@@ -121,7 +120,7 @@
frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
PBJournalUpdate payload = new PBJournalUpdate();
payload.setLocation(convert(location));
- payload.setData(ByteString.copyFrom(sequence.getData(),
sequence.getOffset(), sequence.getLength()));
+ payload.setData(new
org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(),
sequence.getLength()));
frame.setPayload(payload);
for (ReplicationSession session : sessions) {
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java?rev=713835&r1=713834&r2=713835&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
Thu Nov 13 13:40:16 2008
@@ -39,7 +39,7 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.protobuf.ByteString;
+import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@@ -48,7 +48,6 @@
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
@@ -138,8 +137,8 @@
command.setMessageId(message.getMessageId().toString());
command.setTransactionInfo(
createTransactionInfo(message.getTransactionId()) );
- ByteSequence packet = wireFormat.marshal(message);
- command.setMessage(ByteString.copyFrom(packet.getData(),
packet.getOffset(), packet.getLength()));
+ org.apache.activemq.util.ByteSequence packet =
wireFormat.marshal(message);
+ command.setMessage(new Buffer(packet.getData(),
packet.getOffset(), packet.getLength()));
store(command, isSyncWrites() && message.isResponseRequired());
@@ -276,8 +275,8 @@
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setRetroactive(retroactive);
- ByteSequence packet = wireFormat.marshal(subscriptionInfo);
- command.setSubscriptionInfo(ByteString.copyFrom(packet.getData(),
packet.getOffset(), packet.getLength()));
+ org.apache.activemq.util.ByteSequence packet =
wireFormat.marshal(subscriptionInfo);
+ command.setSubscriptionInfo(new Buffer(packet.getData(),
packet.getOffset(), packet.getLength()));
store(command, isSyncWrites() && true);
}
@@ -521,8 +520,8 @@
} else {
XATransactionId t = (XATransactionId)txid;
KahaXATransactionId kahaTxId = new KahaXATransactionId();
-
kahaTxId.setBranchQualifier(ByteString.copyFrom(t.getBranchQualifier()));
-
kahaTxId.setGlobalTransactionId(ByteString.copyFrom(t.getGlobalTransactionId()));
+ kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
+ kahaTxId.setGlobalTransactionId(new
Buffer(t.getGlobalTransactionId()));
kahaTxId.setFormatId(t.getFormatId());
rc.setXaTransacitonId(kahaTxId);
}
Added:
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/PBMesssagesTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/PBMesssagesTest.java?rev=713835&view=auto
==============================================================================
---
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/PBMesssagesTest.java
(added)
+++
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/PBMesssagesTest.java
Thu Nov 13 13:40:16 2008
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.store.data.KahaAddMessageCommand;
+import org.apache.kahadb.store.data.KahaDestination;
+import org.apache.kahadb.store.data.KahaEntryType;
+import org.apache.kahadb.store.data.KahaDestination.DestinationType;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+
+public class PBMesssagesTest extends TestCase {
+
+ public void testKahaAddMessageCommand() throws IOException {
+
+ KahaAddMessageCommand expected = new KahaAddMessageCommand();
+ expected.setDestination(new
KahaDestination().setName("Foo").setType(DestinationType.QUEUE));
+ expected.setMessage(new Buffer(new byte[] {1,2,3,4,5,6} ));
+ expected.setMessageId("Hello World");
+
+ int size = expected.serializedSizeFramed();
+ DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+ os.writeByte(expected.type().getNumber());
+ expected.writeFramed(os);
+ ByteSequence seq = os.toByteSequence();
+
+ DataByteArrayInputStream is = new DataByteArrayInputStream(seq);
+ KahaEntryType type = KahaEntryType.valueOf(is.readByte());
+ JournalCommand message = (JournalCommand)type.createMessage();
+ message.mergeFramed(is);
+
+ assertEquals(expected, message);
+ }
+
+}