Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java?rev=755716&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Operation.java Wed Mar 18 19:53:51 2009 @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.store.kahadb; + +import java.io.IOException; + +import org.apache.activemq.broker.store.kahadb.data.KahaAddMessageCommand; +import org.apache.activemq.broker.store.kahadb.data.KahaRemoveMessageCommand; +import org.apache.kahadb.journal.Location; +import org.apache.kahadb.page.Transaction; + +public abstract class Operation { + + protected final KahaDBStore store; + final Location location; + + public Operation(KahaDBStore store, Location location) { + this.store = store; + this.location = location; + } + + public Location getLocation() { + return location; + } + + abstract public void execute(Transaction tx) throws IOException; + + + public static class AddOpperation extends Operation { + final KahaAddMessageCommand command; + + public AddOpperation(KahaDBStore store, KahaAddMessageCommand command, Location location) { + super(store, location); + this.command = command; + } + + public void execute(Transaction tx) throws IOException { + store.upadateIndex(tx, command, location); + } + + public KahaAddMessageCommand getCommand() { + return command; + } + } + + public static class RemoveOpperation extends Operation { + + final KahaRemoveMessageCommand command; + + public RemoveOpperation(KahaDBStore store, KahaRemoveMessageCommand command, Location location) { + super(store, location); + this.command = command; + } + + public void execute(Transaction tx) throws IOException { + store.updateIndex(tx, command, location); + } + + public KahaRemoveMessageCommand getCommand() { + return command; + } + } +} \ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java?rev=755716&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java Wed Mar 18 19:53:51 2009 @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.store.kahadb; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.kahadb.index.BTreeIndex; +import org.apache.kahadb.journal.Location; +import org.apache.kahadb.page.Page; +import org.apache.kahadb.util.Marshaller; + +public class StoredDBState { + + protected final KahaDBStore store; + protected Page<StoredDBState> page; + protected int state; + protected BTreeIndex<String, StoredDestinationState> destinations; + protected Location lastUpdate; + protected Location firstInProgressTransactionLocation; + + public StoredDBState(KahaDBStore store) { + this.store = store; + } + + + public void read(DataInput is) throws IOException { + state = is.readInt(); + destinations = new BTreeIndex<String, StoredDestinationState>(store.pageFile, is.readLong()); + if (is.readBoolean()) { + lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); + } else { + lastUpdate = null; + } + if (is.readBoolean()) { + firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); + } else { + firstInProgressTransactionLocation = null; + } + } + + public void write(DataOutput os) throws IOException { + os.writeInt(state); + os.writeLong(destinations.getPageId()); + + if (lastUpdate != null) { + os.writeBoolean(true); + LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); + } else { + os.writeBoolean(false); + } + + if (firstInProgressTransactionLocation != null) { + os.writeBoolean(true); + LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); + } else { + os.writeBoolean(false); + } + } + + static public class DBStateMarshaller implements Marshaller<StoredDBState> { + private final KahaDBStore store; + + public DBStateMarshaller(KahaDBStore store) { + this.store = store; + } + + public Class<StoredDBState> getType() { + return StoredDBState.class; + } + + public StoredDBState readPayload(DataInput dataIn) throws IOException { + StoredDBState rc = new StoredDBState(this.store); + rc.read(dataIn); + return rc; + } + + public void writePayload(StoredDBState object, DataOutput dataOut) throws IOException { + object.write(dataOut); + } + } +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java?rev=755716&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java Wed Mar 18 19:53:51 2009 @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.store.kahadb; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.TreeMap; + +import org.apache.activemq.broker.store.kahadb.data.KahaSubscriptionCommand; +import org.apache.kahadb.index.BTreeIndex; +import org.apache.kahadb.journal.Location; +import org.apache.kahadb.util.Marshaller; + +public class StoredDestinationState { + long nextMessageId; + BTreeIndex<Long, MessageKeys> orderIndex; + BTreeIndex<Location, Long> locationIndex; + BTreeIndex<String, Long> messageIdIndex; + + // These bits are only set for Topics + BTreeIndex<String, KahaSubscriptionCommand> subscriptions; + BTreeIndex<String, Long> subscriptionAcks; + HashMap<String, Long> subscriptionCursors; + TreeMap<Long, HashSet<String>> ackPositions; + + public static class StoredDestinationMarshaller implements Marshaller<StoredDestinationState> { + private final KahaDBStore store; + + public StoredDestinationMarshaller(KahaDBStore store) { + this.store = store; + } + + public Class<StoredDestinationState> getType() { + return StoredDestinationState.class; + } + + public StoredDestinationState readPayload(DataInput dataIn) throws IOException { + StoredDestinationState value = new StoredDestinationState(); + value.orderIndex = new BTreeIndex<Long, MessageKeys>(store.pageFile, dataIn.readLong()); + value.locationIndex = new BTreeIndex<Location, Long>(store.pageFile, dataIn.readLong()); + value.messageIdIndex = new BTreeIndex<String, Long>(store.pageFile, dataIn.readLong()); + + if (dataIn.readBoolean()) { + value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(store.pageFile, dataIn.readLong()); + value.subscriptionAcks = new BTreeIndex<String, Long>(store.pageFile, dataIn.readLong()); + } + return value; + } + + public void writePayload(StoredDestinationState value, DataOutput dataOut) throws IOException { + dataOut.writeLong(value.orderIndex.getPageId()); + dataOut.writeLong(value.locationIndex.getPageId()); + dataOut.writeLong(value.messageIdIndex.getPageId()); + if (value.subscriptions != null) { + dataOut.writeBoolean(true); + dataOut.writeLong(value.subscriptions.getPageId()); + dataOut.writeLong(value.subscriptionAcks.getPageId()); + } else { + dataOut.writeBoolean(false); + } + } + } +} \ No newline at end of file
