http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
deleted file mode 100644
index b3c5dc9..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.io.File;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import java.math.BigInteger;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-
-import javax.sql.rowset.serial.SerialBlob;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import 
org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.FileUtils;
-
-public class LocalDBPersistenceManager implements 
PersistenceManagerWithRangeScan {
-    private static final Logger logger = 
LoggerFactory.getLogger(LocalDBPersistenceManager.class);
-
-    static String connectionURL;
-
-    static {
-        try {
-            File tempDir = FileUtils.createTempDirectory("derby", null);
-
-            // Since derby needs to create it, I will have to delete it first
-            if (!tempDir.delete()) {
-                throw new IOException("Could not delete dir: " + 
tempDir.getAbsolutePath());
-            }
-            connectionURL = "jdbc:derby:" + tempDir.getAbsolutePath() + 
";create=true";
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    private static final ThreadLocal<Connection> threadLocalConnection = new 
ThreadLocal<Connection>() {
-        @Override
-        protected Connection initialValue() {
-            try {
-                return DriverManager.getConnection(connectionURL);
-            } catch (SQLException e) {
-                logger.error("Could not connect to derby", e);
-                return null;
-            }
-        }
-    };
-
-    private static final ThreadLocal<MessageDigest> threadLocalDigest = new 
ThreadLocal<MessageDigest>() {
-        @Override
-        protected MessageDigest initialValue() {
-            try {
-                return MessageDigest.getInstance("MD5");
-            } catch (NoSuchAlgorithmException e) {
-                logger.error("Could not find MD5 hash", e);
-                return null;
-            }
-        }
-    };
-    static final String ID_FIELD_NAME = "id";
-    static final String MSG_FIELD_NAME = "msg";
-    static final String driver = "org.apache.derby.jdbc.EmbeddedDriver";
-
-    static final int SCAN_CHUNK = 1000;
-
-    /**
-     * Having trouble restarting the database multiple times from within the
-     * same jvm. Hence to facilitate units tests, we are just going to have a
-     * version number that we will append to every table name. This version
-     * number will be incremented in lieu of shutting down the database and
-     * restarting it, so that we get different table names, and it behaves like
-     * a brand new database
-     */
-    private int version = 0;
-
-    ConcurrentMap<ByteString, MessageSeqId> currTopicSeqIds = new 
ConcurrentHashMap<ByteString, MessageSeqId>();
-
-    static LocalDBPersistenceManager instance = new 
LocalDBPersistenceManager();
-
-    public static LocalDBPersistenceManager instance() {
-        return instance;
-    }
-
-    private LocalDBPersistenceManager() {
-
-        try {
-            Class.forName(driver).newInstance();
-            logger.info("Derby Driver loaded");
-        } catch (java.lang.ClassNotFoundException e) {
-            logger.error("Derby driver not found", e);
-        } catch (InstantiationException e) {
-            logger.error("Could not instantiate derby driver", e);
-        } catch (IllegalAccessException e) {
-            logger.error("Could not instantiate derby driver", e);
-        }
-    }
-
-    @Override
-    public void stop() {
-        // do nothing
-    }
-
-    /**
-     * Ensures that at least the default seq-id exists in the map for the given
-     * topic. Checks for race conditions (.e.g, another thread inserts the
-     * default id before us), and returns the latest seq-id value in the map
-     *
-     * @param topic
-     * @return
-     */
-    private MessageSeqId ensureSeqIdExistsForTopic(ByteString topic) {
-        MessageSeqId presentSeqIdInMap = currTopicSeqIds.get(topic);
-
-        if (presentSeqIdInMap != null) {
-            return presentSeqIdInMap;
-        }
-
-        presentSeqIdInMap = 
MessageSeqId.newBuilder().setLocalComponent(0).build();
-        MessageSeqId oldSeqIdInMap = currTopicSeqIds.putIfAbsent(topic, 
presentSeqIdInMap);
-
-        if (oldSeqIdInMap != null) {
-            return oldSeqIdInMap;
-        }
-        return presentSeqIdInMap;
-
-    }
-
-    /**
-     * Adjust the current seq id of the topic based on the message we are about
-     * to publish. The local component of the current seq-id is always
-     * incremented by 1. For the other components, there are two cases:
-     *
-     * 1. If the message to be published doesn't have a seq-id (locally
-     * published messages), the other components are left as is.
-     *
-     * 2. If the message to be published has a seq-id, we take the max of the
-     * current one we have, and that in the message to be published.
-     *
-     * @param topic
-     * @param messageToPublish
-     * @return The value of the local seq-id obtained after incrementing the
-     *         local component. This value should be used as an id while
-     *         persisting to Derby
-     * @throws UnexpectedConditionException
-     */
-    private long adjustTopicSeqIdForPublish(ByteString topic, Message 
messageToPublish)
-            throws UnexpectedConditionException {
-        long retValue = 0;
-        MessageSeqId oldId;
-        MessageSeqId.Builder newIdBuilder = MessageSeqId.newBuilder();
-
-        do {
-            oldId = ensureSeqIdExistsForTopic(topic);
-
-            // Increment our own component by 1
-            retValue = oldId.getLocalComponent() + 1;
-            newIdBuilder.setLocalComponent(retValue);
-
-            if (messageToPublish.hasMsgId()) {
-                // take a region-wise max
-                MessageIdUtils.takeRegionMaximum(newIdBuilder, 
messageToPublish.getMsgId(), oldId);
-
-            } else {
-                
newIdBuilder.addAllRemoteComponents(oldId.getRemoteComponentsList());
-            }
-        } while (!currTopicSeqIds.replace(topic, oldId, newIdBuilder.build()));
-
-        return retValue;
-
-    }
-
-    public long getSeqIdAfterSkipping(ByteString topic, long seqId, int 
skipAmount) {
-        return seqId + skipAmount;
-    }
-
-    public void persistMessage(PersistRequest request) {
-
-        Connection conn = threadLocalConnection.get();
-
-        Callback<MessageSeqId> callback = request.getCallback();
-        Object ctx = request.getCtx();
-        ByteString topic = request.getTopic();
-        Message message = request.getMessage();
-
-        if (conn == null) {
-            callback.operationFailed(ctx, new ServiceDownException("Not 
connected to derby"));
-            return;
-        }
-
-        long seqId;
-
-        try {
-            seqId = adjustTopicSeqIdForPublish(topic, message);
-        } catch (UnexpectedConditionException e) {
-            callback.operationFailed(ctx, e);
-            return;
-        }
-        PreparedStatement stmt;
-
-        boolean triedCreatingTable = false;
-        while (true) {
-            try {
-                message.getBody();
-                stmt = conn.prepareStatement("INSERT INTO " + 
getTableNameForTopic(topic) + " VALUES(?,?)");
-                stmt.setLong(1, seqId);
-                stmt.setBlob(2, new SerialBlob(message.toByteArray()));
-
-                int rowCount = stmt.executeUpdate();
-                stmt.close();
-                if (rowCount != 1) {
-                    logger.error("Unexpected number of affected rows from 
derby");
-                    callback.operationFailed(ctx, new 
ServiceDownException("Unexpected response from derby"));
-                    return;
-                }
-                break;
-            } catch (SQLException sqle) {
-                String theError = (sqle).getSQLState();
-                if (theError.equals("42X05") && !triedCreatingTable) {
-                    createTable(conn, topic);
-                    triedCreatingTable = true;
-                    continue;
-                }
-
-                logger.error("Error while executing derby insert", sqle);
-                callback.operationFailed(ctx, new ServiceDownException(sqle));
-                return;
-            }
-        }
-        callback.operationFinished(ctx, 
MessageIdUtils.mergeLocalSeqId(message, seqId).getMsgId());
-    }
-
-    /*
-     * This method does not throw an exception because another thread might
-     * sneak in and create the table before us
-     */
-    private void createTable(Connection conn, ByteString topic) {
-        Statement stmt = null;
-        try {
-            stmt = conn.createStatement();
-            String tableName = getTableNameForTopic(topic);
-            stmt.execute("CREATE TABLE " + tableName + " (" + ID_FIELD_NAME + 
" BIGINT NOT NULL CONSTRAINT ID_PK_"
-                    + tableName + " PRIMARY KEY," + MSG_FIELD_NAME + " 
BLOB(2M) NOT NULL)");
-        } catch (SQLException e) {
-            logger.debug("Could not create table", e);
-        } finally {
-            try {
-                if (stmt != null) {
-                    stmt.close();
-                }
-            } catch (SQLException e) {
-                logger.error("Error closing statement", e);
-            }
-        }
-    }
-
-    public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) {
-        return ensureSeqIdExistsForTopic(topic);
-    }
-
-    public void scanSingleMessage(ScanRequest request) {
-        scanMessagesInternal(request.getTopic(), request.getStartSeqId(), 1, 
Long.MAX_VALUE, request.getCallback(),
-                             request.getCtx(), 1);
-        return;
-    }
-
-    public void scanMessages(RangeScanRequest request) {
-        scanMessagesInternal(request.getTopic(), request.getStartSeqId(), 
request.getMessageLimit(), request
-                             .getSizeLimit(), request.getCallback(), 
request.getCtx(), SCAN_CHUNK);
-        return;
-    }
-
-    private String getTableNameForTopic(ByteString topic) {
-        String src = (topic.toStringUtf8() + "_" + version);
-        threadLocalDigest.get().reset();
-        byte[] digest = threadLocalDigest.get().digest(src.getBytes(UTF_8));
-        BigInteger bigInt = new BigInteger(1,digest);
-        return String.format("TABLE_%032X", bigInt);
-    }
-
-    private void scanMessagesInternal(ByteString topic, long startSeqId, int 
messageLimit, long sizeLimit,
-                                      ScanCallback callback, Object ctx, int 
scanChunk) {
-
-        Connection conn = threadLocalConnection.get();
-
-        if (conn == null) {
-            callback.scanFailed(ctx, new ServiceDownException("Not connected 
to derby"));
-            return;
-        }
-
-        long currentSeqId;
-        currentSeqId = startSeqId;
-
-        PreparedStatement stmt = null;
-        try {
-            try {
-                stmt = conn.prepareStatement("SELECT * FROM " + 
getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME
-                                             + " >= ?  AND " + ID_FIELD_NAME + 
" <= ?");
-
-            } catch (SQLException sqle) {
-                String theError = (sqle).getSQLState();
-                if (theError.equals("42X05")) {
-                    // No table, scan is over
-                    callback.scanFinished(ctx, 
ReasonForFinish.NO_MORE_MESSAGES);
-                    return;
-                } else {
-                    throw sqle;
-                }
-            }
-
-            int numMessages = 0;
-            long totalSize = 0;
-
-            while (true) {
-
-                stmt.setLong(1, currentSeqId);
-                stmt.setLong(2, currentSeqId + scanChunk);
-
-                if (!stmt.execute()) {
-                    String errorMsg = "Select query did not return a result 
set";
-                    logger.error(errorMsg);
-                    stmt.close();
-                    callback.scanFailed(ctx, new 
ServiceDownException(errorMsg));
-                    return;
-                }
-
-                ResultSet resultSet = stmt.getResultSet();
-
-                if (!resultSet.next()) {
-                    stmt.close();
-                    callback.scanFinished(ctx, 
ReasonForFinish.NO_MORE_MESSAGES);
-                    return;
-                }
-
-                do {
-
-                    long localSeqId = resultSet.getLong(1);
-
-                    Message.Builder messageBuilder = 
Message.newBuilder().mergeFrom(resultSet.getBinaryStream(2));
-
-                    // Merge in the local seq-id since that is not stored with
-                    // the message
-                    Message message = 
MessageIdUtils.mergeLocalSeqId(messageBuilder, localSeqId);
-
-                    callback.messageScanned(ctx, message);
-                    numMessages++;
-                    totalSize += message.getBody().size();
-
-                    if (numMessages > messageLimit) {
-                        stmt.close();
-                        callback.scanFinished(ctx, 
ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
-                        return;
-                    } else if (totalSize > sizeLimit) {
-                        stmt.close();
-                        callback.scanFinished(ctx, 
ReasonForFinish.SIZE_LIMIT_EXCEEDED);
-                        return;
-                    }
-
-                } while (resultSet.next());
-
-                currentSeqId += SCAN_CHUNK;
-            }
-        } catch (SQLException e) {
-            logger.error("SQL Exception", e);
-            callback.scanFailed(ctx, new ServiceDownException(e));
-            return;
-        } catch (IOException e) {
-            logger.error("Message stored in derby is not parseable", e);
-            callback.scanFailed(ctx, new ServiceDownException(e));
-            return;
-        } finally {
-            try {
-                if (stmt != null) {
-                    stmt.close();
-                }
-            } catch (SQLException e) {
-                logger.error("Error closing statement", e);
-            }
-        }
-    }
-
-    public void deliveredUntil(ByteString topic, Long seqId) {
-        // noop
-    }
-
-    public void consumedUntil(ByteString topic, Long seqId) {
-        Connection conn = threadLocalConnection.get();
-        if (conn == null) {
-            logger.error("Not connected to derby");
-            return;
-        }
-        PreparedStatement stmt = null;
-        try {
-            stmt = conn.prepareStatement("DELETE FROM " + 
getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME
-                                         + " <= ?");
-            stmt.setLong(1, seqId);
-            int rowCount = stmt.executeUpdate();
-            if (logger.isDebugEnabled()) {
-              logger.debug("Deleted " + rowCount + " records for topic: " + 
topic.toStringUtf8()
-                  + ", seqId: " + seqId);
-            }
-        } catch (SQLException sqle) {
-            String theError = (sqle).getSQLState();
-            if (theError.equals("42X05")) {
-                logger.warn("Table for topic (" + topic + ") does not exist so 
no consumed messages to delete!");
-            } else
-                logger.error("Error while executing derby delete for consumed 
messages", sqle);
-        } finally {
-            try {
-                if (stmt != null) {
-                    stmt.close();
-                }
-            } catch (SQLException e) {
-                logger.error("Error closing statement", e);
-            }
-        }
-    }
-
-    public void setMessageBound(ByteString topic, Integer bound) {
-        // noop; Maybe implement later
-    }
-
-    public void clearMessageBound(ByteString topic) {
-        // noop; Maybe implement later
-    }
-
-    public void consumeToBound(ByteString topic) {
-        // noop; Maybe implement later
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        if (driver.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
-            boolean gotSQLExc = false;
-            // This is weird: on normal shutdown, it throws an exception
-            try {
-                
DriverManager.getConnection("jdbc:derby:;shutdown=true").close();
-            } catch (SQLException se) {
-                if (se.getSQLState().equals("XJ015")) {
-                    gotSQLExc = true;
-                }
-            }
-            if (!gotSQLExc) {
-                logger.error("Database did not shut down normally");
-            } else {
-                logger.info("Database shut down normally");
-            }
-        }
-        super.finalize();
-    }
-
-    public void reset() {
-        // just move the namespace over to the next one
-        version++;
-        currTopicSeqIds.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java
deleted file mode 100644
index f640723..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.Collection;
-import java.util.Map;
-
-public class MapMethods {
-
-    public static <K, V> V getAfterInsertingIfAbsent(Map<K, V> map, K key, 
Factory<V> valueFactory) {
-        V value = map.get(key);
-
-        if (value == null) {
-            value = valueFactory.newInstance();
-            map.put(key, value);
-        }
-
-        return value;
-    }
-
-    public static <K, V, Z extends Collection<V>> void addToMultiMap(Map<K, Z> 
map, K key, V value,
-            Factory<Z> valueFactory) {
-        Collection<V> collection = getAfterInsertingIfAbsent(map, key, 
valueFactory);
-
-        collection.add(value);
-
-    }
-
-    public static <K, V, Z extends Collection<V>> boolean 
removeFromMultiMap(Map<K, Z> map, K key, V value) {
-        Collection<V> collection = map.get(key);
-
-        if (collection == null) {
-            return false;
-        }
-
-        if (!collection.remove(value)) {
-            return false;
-        } else {
-            if (collection.isEmpty()) {
-                map.remove(key);
-            }
-            return true;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java
deleted file mode 100644
index d137fe6..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Encapsulates a request to persist a given message on a given topic. The
- * request is completed asynchronously, callback and context are provided
- *
- */
-public class PersistRequest {
-    ByteString topic;
-    Message message;
-    private Callback<PubSubProtocol.MessageSeqId> callback;
-    Object ctx;
-
-    public PersistRequest(ByteString topic, Message message, 
Callback<PubSubProtocol.MessageSeqId> callback, Object ctx) {
-        this.topic = topic;
-        this.message = message;
-        this.callback = callback;
-        this.ctx = ctx;
-    }
-
-    public ByteString getTopic() {
-        return topic;
-    }
-
-    public Message getMessage() {
-        return message;
-    }
-
-    public Callback<PubSubProtocol.MessageSeqId> getCallback() {
-        return callback;
-    }
-
-    public Object getCtx() {
-        return ctx;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java
deleted file mode 100644
index a295fc7..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import com.google.protobuf.ByteString;
-import 
org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-
-/**
- * An implementation of this interface will persist messages in order and 
assign
- * a seqId to each persisted message. SeqId need not be a single number in
- * general. SeqId is opaque to all layers above {@link PersistenceManager}. 
Only
- * the {@link PersistenceManager} needs to understand the format of the seqId
- * and maintain it in such a way that there is a total order on the seqIds of a
- * topic.
- *
- */
-public interface PersistenceManager {
-
-    /**
-     * Executes the given persist request asynchronously. When done, the
-     * callback specified in the request object is called with the result of 
the
-     * operation set to the {@link LocalMessageSeqId} assigned to the persisted
-     * message.
-     */
-    public void persistMessage(PersistRequest request);
-
-    /**
-     * Get the seqId of the last message that has been persisted to the given
-     * topic. The returned seqId will be set as the consume position of any
-     * brand new subscription on this topic.
-     *
-     * Note that the return value may quickly become invalid because a
-     * {@link #persistMessage(String, PublishedMessage)} call from another
-     * thread succeeds. For us, the typical use case is choosing the consume
-     * position of a new subscriber. Since the subscriber need not receive all
-     * messages that are published while the subscribe call is in progress, 
such
-     * loose semantics from this method is acceptable.
-     *
-     * @param topic
-     * @return the seqId of the last persisted message.
-     * @throws ServerNotResponsibleForTopicException
-     */
-    public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws 
ServerNotResponsibleForTopicException;
-
-    /**
-     * Executes the given scan request
-     *
-     */
-    public void scanSingleMessage(ScanRequest request);
-
-    /**
-     * Gets the next seq-id. This method should never block.
-     */
-    public long getSeqIdAfterSkipping(ByteString topic, long seqId, int 
skipAmount);
-
-    /**
-     * Hint that the messages until the given seqId have been delivered and 
wont
-     * be needed unless there is a failure of some kind
-     */
-    public void deliveredUntil(ByteString topic, Long seqId);
-
-    /**
-     * Hint that the messages until the given seqId have been consumed by all
-     * subscribers to the topic and no longer need to be stored. The
-     * implementation classes can decide how and if they want to garbage 
collect
-     * and delete these older topic messages that are no longer needed.
-     *
-     * @param topic
-     *            Topic
-     * @param seqId
-     *            Message local sequence ID
-     */
-    public void consumedUntil(ByteString topic, Long seqId);
-
-    public void setMessageBound(ByteString topic, Integer bound);
-    public void clearMessageBound(ByteString topic);
-    public void consumeToBound(ByteString topic);
-
-    /**
-     * Stop persistence manager.
-     */
-    public void stop();
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java
deleted file mode 100644
index f12174f..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-public interface PersistenceManagerWithRangeScan extends PersistenceManager {
-    /**
-     * Executes the given range scan request
-     *
-     * @param request
-     */
-    public void scanMessages(RangeScanRequest request);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java
deleted file mode 100644
index 3ac324d..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import com.google.protobuf.ByteString;
-
-/**
- * Encapsulates a request to scan messages on the given topic starting from the
- * given seqId (included). A call-back {@link ScanCallback} is provided. As
- * messages are scanned, the relevant methods of the {@link ScanCallback} are
- * called. Two hints are provided as to when scanning should stop: in terms of
- * number of messages scanned, or in terms of the total size of messages
- * scanned. Scanning stops whenever one of these limits is exceeded. These
- * checks, especially the one about message size, are only approximate. The
- * {@link ScanCallback} used should be prepared to deal with more or less
- * messages scanned. If an error occurs during scanning, the
- * {@link ScanCallback} is notified of the error.
- *
- */
-public class RangeScanRequest {
-    ByteString topic;
-    long startSeqId;
-    int messageLimit;
-    long sizeLimit;
-    ScanCallback callback;
-    Object ctx;
-
-    public RangeScanRequest(ByteString topic, long startSeqId, int 
messageLimit, long sizeLimit, ScanCallback callback,
-                            Object ctx) {
-        this.topic = topic;
-        this.startSeqId = startSeqId;
-        this.messageLimit = messageLimit;
-        this.sizeLimit = sizeLimit;
-        this.callback = callback;
-        this.ctx = ctx;
-    }
-
-    public ByteString getTopic() {
-        return topic;
-    }
-
-    public long getStartSeqId() {
-        return startSeqId;
-    }
-
-    public int getMessageLimit() {
-        return messageLimit;
-    }
-
-    public long getSizeLimit() {
-        return sizeLimit;
-    }
-
-    public ScanCallback getCallback() {
-        return callback;
-    }
-
-    public Object getCtx() {
-        return ctx;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
deleted file mode 100644
index 48be3e8..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
+++ /dev/null
@@ -1,865 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.apache.bookkeeper.util.SafeRunnable;
-import org.apache.hedwig.exceptions.PubSubException;
-import 
org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.jmx.HedwigJMXService;
-import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
-import org.apache.hedwig.server.jmx.HedwigMBeanRegistry;
-import org.apache.hedwig.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-public class ReadAheadCache implements PersistenceManager, HedwigJMXService {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(ReadAheadCache.class);
-
-    protected interface CacheRequest {
-        public void performRequest();
-    }
-
-    /**
-     * The underlying persistence manager that will be used for persistence and
-     * scanning below the cache
-     */
-    protected PersistenceManagerWithRangeScan realPersistenceManager;
-
-    /**
-     * The structure for the cache
-     */
-    protected ConcurrentMap<CacheKey, CacheValue> cache =
-        new ConcurrentHashMap<CacheKey, CacheValue>();
-
-    /**
-     * We also want to track the entries in seq-id order so that we can clean 
up
-     * entries after the last subscriber
-     */
-    protected ConcurrentMap<ByteString, SortedSet<Long>> orderedIndexOnSeqId =
-        new ConcurrentHashMap<ByteString, SortedSet<Long>>();
-
-    /**
-     * Partition Cache into Serveral Segments for simplify synchronization.
-     * Each segment maintains its time index and segment size.
-     */
-    static class CacheSegment {
-
-        /**
-         * We want to keep track of when entries were added in the cache, so 
that we
-         * can remove them in a FIFO fashion
-         */
-        protected SortedMap<Long, Set<CacheKey>> timeIndexOfAddition = new 
TreeMap<Long, Set<CacheKey>>();
-
-        /**
-         * We maintain an estimate of the current size of each cache segment,
-         * so that the thread know when to evict entries from cache segment.
-         */
-        protected AtomicLong presentSegmentSize = new AtomicLong(0);
-
-    }
-
-    /**
-     * We maintain an estimate of the current size of the cache, so that we 
know
-     * when to evict entries.
-     */
-    protected AtomicLong presentCacheSize = new AtomicLong(0);
-
-    /**
-     * Num pending requests.
-     */
-    protected AtomicInteger numPendingRequests = new AtomicInteger(0);
-
-    /**
-     * Cache segment for different threads
-     */
-    protected final ThreadLocal<CacheSegment> cacheSegment =
-        new ThreadLocal<CacheSegment>() {
-            @Override
-            protected CacheSegment initialValue() {
-                return new CacheSegment();
-            }
-        };
-
-    /**
-     * One instance of a callback that we will pass to the underlying
-     * persistence manager when asking it to persist messages
-     */
-    protected PersistCallback persistCallbackInstance = new PersistCallback();
-
-    /**
-     * 2 kinds of exceptions that we will use to signal error from readahead
-     */
-    protected NoSuchSeqIdException noSuchSeqIdExceptionInstance = new 
NoSuchSeqIdException();
-    protected ReadAheadException readAheadExceptionInstance = new 
ReadAheadException();
-
-    protected ServerConfiguration cfg;
-    // Boolean indicating if this thread should continue running. This is used
-    // when we want to stop the thread during a PubSubServer shutdown.
-    protected volatile boolean keepRunning = true;
-
-    protected final OrderedSafeExecutor cacheWorkers;
-    protected final int numCacheWorkers;
-    protected volatile long maxSegmentSize;
-    protected volatile long cacheEntryTTL;
-
-    // JMX Beans
-    ReadAheadCacheBean jmxCacheBean = null;
-
-    /**
-     * Constructor. Starts the cache maintainer thread
-     *
-     * @param realPersistenceManager
-     */
-    public ReadAheadCache(PersistenceManagerWithRangeScan 
realPersistenceManager, ServerConfiguration cfg) {
-        this.realPersistenceManager = realPersistenceManager;
-        this.cfg = cfg;
-        numCacheWorkers = cfg.getNumReadAheadCacheThreads();
-        cacheWorkers = OrderedSafeExecutor.newBuilder()
-                .name("ReadAheadCacheScheduler")
-                .numThreads(numCacheWorkers)
-                .build();
-        reloadConf(cfg);
-    }
-
-    /**
-     * Reload configuration
-     *
-     * @param conf
-     *          Server configuration object
-     */
-    protected void reloadConf(ServerConfiguration cfg) {
-        maxSegmentSize = cfg.getMaximumCacheSize() / numCacheWorkers;
-        cacheEntryTTL = cfg.getCacheEntryTTL();
-    }
-
-    public ReadAheadCache start() {
-        return this;
-    }
-
-    /**
-     * ========================================================================
-     * Methods of {@link PersistenceManager} that we will pass straight down to
-     * the real persistence manager.
-     */
-
-    @Override
-    public long getSeqIdAfterSkipping(ByteString topic, long seqId, int 
skipAmount) {
-        return realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, 
skipAmount);
-    }
-
-    @Override
-    public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws 
ServerNotResponsibleForTopicException {
-        return realPersistenceManager.getCurrentSeqIdForTopic(topic);
-    }
-
-    /**
-     * ========================================================================
-     * Other methods of {@link PersistenceManager} that the cache needs to take
-     * some action on.
-     *
-     * 1. Persist: We pass it through to the real persistence manager but 
insert
-     * our callback on the return path
-     *
-     */
-    @Override
-    public void persistMessage(PersistRequest request) {
-        // make a new PersistRequest object so that we can insert our own
-        // callback in the middle. Assign the original request as the context
-        // for the callback.
-
-        PersistRequest newRequest = new PersistRequest(request.getTopic(), 
request.getMessage(),
-                persistCallbackInstance, request);
-        realPersistenceManager.persistMessage(newRequest);
-    }
-
-    /**
-     * The callback that we insert on the persist request return path. The
-     * callback simply forms a {@link PersistResponse} object and inserts it in
-     * the request queue to be handled serially by the cache maintainer thread.
-     *
-     */
-    public class PersistCallback implements 
Callback<PubSubProtocol.MessageSeqId> {
-
-        /**
-         * In case there is a failure in persisting, just pass it to the
-         * original callback
-         */
-        @Override
-        public void operationFailed(Object ctx, PubSubException exception) {
-            PersistRequest originalRequest = (PersistRequest) ctx;
-            Callback<PubSubProtocol.MessageSeqId> originalCallback = 
originalRequest.getCallback();
-            Object originalContext = originalRequest.getCtx();
-            originalCallback.operationFailed(originalContext, exception);
-        }
-
-        /**
-         * When the persist finishes, we first notify the original callback of
-         * success, and then opportunistically treat the message as if it just
-         * came in through a scan
-         */
-        @Override
-        public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId 
resultOfOperation) {
-            PersistRequest originalRequest = (PersistRequest) ctx;
-
-            // Lets call the original callback first so that the publisher can
-            // hear success
-            
originalRequest.getCallback().operationFinished(originalRequest.getCtx(), 
resultOfOperation);
-
-            // Original message that was persisted didn't have the local 
seq-id.
-            // Lets add that in
-            Message messageWithLocalSeqId = 
MessageIdUtils.mergeLocalSeqId(originalRequest.getMessage(),
-                                            
resultOfOperation.getLocalComponent());
-
-            // Now enqueue a request to add this newly persisted message to our
-            // cache
-            CacheKey cacheKey = new CacheKey(originalRequest.getTopic(), 
resultOfOperation.getLocalComponent());
-
-            enqueueWithoutFailureByTopic(cacheKey.getTopic(),
-                    new ScanResponse(cacheKey, messageWithLocalSeqId));
-        }
-
-    }
-
-    protected void enqueueWithoutFailureByTopic(ByteString topic, final 
CacheRequest obj) {
-        if (!keepRunning) {
-            return;
-        }
-        try {
-            numPendingRequests.incrementAndGet();
-            cacheWorkers.submitOrdered(topic, new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    numPendingRequests.decrementAndGet();
-                    obj.performRequest();
-                }
-            });
-        } catch (RejectedExecutionException ree) {
-            logger.error("Failed to submit cache request for topic " + 
topic.toStringUtf8() + " : ", ree);
-        }
-    }
-
-    /**
-     * Another method from {@link PersistenceManager}.
-     *
-     * 2. Scan - Since the scan needs to touch the cache, we will just enqueue
-     * the scan request and let the cache maintainer thread handle it.
-     */
-    @Override
-    public void scanSingleMessage(ScanRequest request) {
-        // Let the scan requests be serialized through the queue
-        enqueueWithoutFailureByTopic(request.getTopic(),
-                new ScanRequestWrapper(request));
-    }
-
-    /**
-     * Another method from {@link PersistenceManager}.
-     *
-     * 3. Enqueue the request so that the cache maintainer thread can delete 
all
-     * message-ids older than the one specified
-     */
-    @Override
-    public void deliveredUntil(ByteString topic, Long seqId) {
-        enqueueWithoutFailureByTopic(topic, new DeliveredUntil(topic, seqId));
-    }
-
-    /**
-     * Another method from {@link PersistenceManager}.
-     *
-     * Since this is a cache layer on top of an underlying persistence manager,
-     * we can just call the consumedUntil method there. The messages older than
-     * the latest one passed here won't be accessed anymore so they should just
-     * get aged out of the cache eventually. For now, there is no need to
-     * proactively remove those entries from the cache.
-     */
-    @Override
-    public void consumedUntil(ByteString topic, Long seqId) {
-        realPersistenceManager.consumedUntil(topic, seqId);
-    }
-
-    @Override
-    public void setMessageBound(ByteString topic, Integer bound) {
-        realPersistenceManager.setMessageBound(topic, bound);
-    }
-
-    @Override
-    public void clearMessageBound(ByteString topic) {
-        realPersistenceManager.clearMessageBound(topic);
-    }
-
-    @Override
-    public void consumeToBound(ByteString topic) {
-        realPersistenceManager.consumeToBound(topic);
-    }
-
-    /**
-     * Stop the readahead cache.
-     */
-    @Override
-    public void stop() {
-        try {
-            keepRunning = false;
-            cacheWorkers.shutdown();
-        } catch (Exception e) {
-            logger.warn("Failed to shut down cache workers : ", e);
-        }
-    }
-
-    /**
-     * The readahead policy is simple: We check if an entry already exists for
-     * the message being requested. If an entry exists, it means that either
-     * that message is already in the cache, or a read for that message is
-     * outstanding. In that case, we look a little ahead (by readAheadCount/2)
-     * and issue a range read of readAheadCount/2 messages. The idea is to
-     * ensure that the next readAheadCount messages are always available.
-     *
-     * @return the range scan that should be issued for read ahead
-     */
-    protected RangeScanRequest doReadAhead(ScanRequest request) {
-        ByteString topic = request.getTopic();
-        Long seqId = request.getStartSeqId();
-
-        int readAheadCount = cfg.getReadAheadCount();
-        // To prevent us from getting screwed by bad configuration
-        readAheadCount = Math.max(1, readAheadCount);
-
-        RangeScanRequest readAheadRequest = doReadAheadStartingFrom(topic, 
seqId, readAheadCount);
-
-        if (readAheadRequest != null) {
-            return readAheadRequest;
-        }
-
-        // start key was already there in the cache so no readahead happened,
-        // lets look a little beyond
-        seqId = realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, 
readAheadCount / 2);
-
-        readAheadRequest = doReadAheadStartingFrom(topic, seqId, 
readAheadCount / 2);
-
-        return readAheadRequest;
-    }
-
-    /**
-     * This method just checks if the provided seq-id already exists in the
-     * cache. If not, a range read of the specified amount is issued.
-     *
-     * @param topic
-     * @param seqId
-     * @param readAheadCount
-     * @return The range read that should be issued
-     */
-    protected RangeScanRequest doReadAheadStartingFrom(ByteString topic, long 
seqId, int readAheadCount) {
-
-        long startSeqId = seqId;
-        Queue<CacheKey> installedStubs = new LinkedList<CacheKey>();
-
-        int i = 0;
-
-        for (; i < readAheadCount; i++) {
-            CacheKey cacheKey = new CacheKey(topic, seqId);
-
-            // Even if a stub exists, it means that a scan for that is
-            // outstanding
-            if (cache.containsKey(cacheKey)) {
-                break;
-            }
-            CacheValue cacheValue = new CacheValue();
-            if (null != cache.putIfAbsent(cacheKey, cacheValue)) {
-                logger.warn("It is unexpected that more than one threads are 
adding message to cache key {}"
-                            +" at the same time.", cacheKey);
-            }
-
-            logger.debug("Adding cache stub for: {}", cacheKey);
-            installedStubs.add(cacheKey);
-
-            seqId = realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, 
1);
-        }
-
-        // so how many did we decide to readahead
-        if (i == 0) {
-            // no readahead, hence return false
-            return null;
-        }
-
-        long readAheadSizeLimit = cfg.getReadAheadSizeBytes();
-        ReadAheadScanCallback callback = new 
ReadAheadScanCallback(installedStubs, topic);
-        RangeScanRequest rangeScanRequest = new RangeScanRequest(topic, 
startSeqId, i, readAheadSizeLimit, callback,
-                null);
-
-        return rangeScanRequest;
-
-    }
-
-    /**
-     * This is the callback that is used for the range scans.
-     */
-    protected class ReadAheadScanCallback implements ScanCallback {
-        Queue<CacheKey> installedStubs;
-        ByteString topic;
-
-        /**
-         * Constructor
-         *
-         * @param installedStubs
-         *            The list of stubs that were installed for this range scan
-         * @param topic
-         */
-        public ReadAheadScanCallback(Queue<CacheKey> installedStubs, 
ByteString topic) {
-            this.installedStubs = installedStubs;
-            this.topic = topic;
-        }
-
-        @Override
-        public void messageScanned(Object ctx, Message message) {
-
-            // Any message we read is potentially useful for us, so lets first
-            // enqueue it
-            CacheKey cacheKey = new CacheKey(topic, 
message.getMsgId().getLocalComponent());
-            enqueueWithoutFailureByTopic(topic, new ScanResponse(cacheKey, 
message));
-
-            // Now lets see if this message is the one we were expecting
-            CacheKey expectedKey = installedStubs.peek();
-
-            if (expectedKey == null) {
-                // Was not expecting any more messages to come in, but they 
came
-                // in so we will keep them
-                return;
-            }
-
-            if (expectedKey.equals(cacheKey)) {
-                // what we got is what we expected, dequeue it so we get the
-                // next expected one
-                installedStubs.poll();
-                return;
-            }
-
-            // If reached here, what we scanned was not what we were expecting.
-            // This means that we have wrong stubs installed in the cache. We
-            // should remove them, so that whoever is waiting on them can 
retry.
-            // This shouldn't be happening usually
-            logger.warn("Unexpected message seq-id: " + 
message.getMsgId().getLocalComponent() + " on topic: "
-                        + topic.toStringUtf8() + " from readahead scan, was 
expecting seq-id: " + expectedKey.seqId
-                        + " topic: " + expectedKey.topic.toStringUtf8() + " 
installedStubs: " + installedStubs);
-            enqueueDeleteOfRemainingStubs(noSuchSeqIdExceptionInstance);
-
-        }
-
-        @Override
-        public void scanFailed(Object ctx, Exception exception) {
-            enqueueDeleteOfRemainingStubs(exception);
-        }
-
-        @Override
-        public void scanFinished(Object ctx, ReasonForFinish reason) {
-            // If the scan finished because no more messages are present, its 
ok
-            // to leave the stubs in place because they will get filled in as
-            // new publishes happen. However, if the scan finished due to some
-            // other reason, e.g., read ahead size limit was reached, we want 
to
-            // delete the stubs, so that when the time comes, we can schedule
-            // another readahead request.
-            if (reason != ReasonForFinish.NO_MORE_MESSAGES) {
-                enqueueDeleteOfRemainingStubs(readAheadExceptionInstance);
-            }
-        }
-
-        private void enqueueDeleteOfRemainingStubs(Exception reason) {
-            CacheKey installedStub;
-            while ((installedStub = installedStubs.poll()) != null) {
-                enqueueWithoutFailureByTopic(installedStub.getTopic(),
-                        new ExceptionOnCacheKey(installedStub, reason));
-            }
-        }
-    }
-
-    protected static class HashSetCacheKeyFactory implements 
Factory<Set<CacheKey>> {
-        protected final static HashSetCacheKeyFactory instance = new 
HashSetCacheKeyFactory();
-
-        @Override
-        public Set<CacheKey> newInstance() {
-            return new HashSet<CacheKey>();
-        }
-    }
-
-    protected static class TreeSetLongFactory implements 
Factory<SortedSet<Long>> {
-        protected final static TreeSetLongFactory instance = new 
TreeSetLongFactory();
-
-        @Override
-        public SortedSet<Long> newInstance() {
-            return new TreeSet<Long>();
-        }
-    }
-
-    /**
-     * For adding the message to the cache, we do some bookeeping such as the
-     * total size of cache, order in which entries were added etc. If the size
-     * of the cache has exceeded our budget, old entries are collected.
-     *
-     * @param cacheKey
-     * @param message
-     */
-    protected void addMessageToCache(final CacheKey cacheKey,
-                                     final Message message, final long 
currTime) {
-        logger.debug("Adding msg {} to readahead cache", cacheKey);
-
-        CacheValue cacheValue;
-        if ((cacheValue = cache.get(cacheKey)) == null) {
-            cacheValue = new CacheValue();
-            CacheValue oldValue = cache.putIfAbsent(cacheKey, cacheValue);
-            if (null != oldValue) {
-                logger.warn("Weird! Should not have two threads adding message 
to cache key {} at the same time.",
-                            cacheKey);
-                cacheValue = oldValue;
-            }
-        }
-
-        CacheSegment segment = cacheSegment.get();
-        if (cacheValue.isStub()) { // update cache size only when cache value 
is a stub
-            int size = message.getBody().size();
-
-            // update the cache size
-            segment.presentSegmentSize.addAndGet(size);
-            presentCacheSize.addAndGet(size);
-        }
-
-        synchronized (cacheValue) {
-            // finally add the message to the cache
-            cacheValue.setMessageAndInvokeCallbacks(message, currTime);
-        }
-
-        // maintain the index of seq-id
-        // no lock since threads are partitioned by topics
-        MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(),
-                                 cacheKey.getSeqId(), 
TreeSetLongFactory.instance);
-
-        // maintain the time index of addition
-        MapMethods.addToMultiMap(segment.timeIndexOfAddition, currTime,
-                                 cacheKey, HashSetCacheKeyFactory.instance);
-
-        collectOldOrExpiredCacheEntries(segment);
-    }
-
-    protected void removeMessageFromCache(final CacheKey cacheKey, Exception 
exception,
-                                          final boolean maintainTimeIndex,
-                                          final boolean maintainSeqIdIndex) {
-        CacheValue cacheValue = cache.remove(cacheKey);
-
-        if (cacheValue == null) {
-            return;
-        }
-
-        CacheSegment segment = cacheSegment.get();
-
-        long timeOfAddition = 0;
-        synchronized (cacheValue) {
-            if (cacheValue.isStub()) {
-                cacheValue.setErrorAndInvokeCallbacks(exception);
-                // Stubs are not present in the indexes, so don't need to 
maintain
-                // indexes here
-                return;
-            }
-
-            int size = 0 - cacheValue.getMessage().getBody().size();
-            presentCacheSize.addAndGet(size);
-            segment.presentSegmentSize.addAndGet(size);
-            timeOfAddition = cacheValue.getTimeOfAddition();
-        }
-
-        if (maintainSeqIdIndex) {
-            MapMethods.removeFromMultiMap(orderedIndexOnSeqId, 
cacheKey.getTopic(),
-                                          cacheKey.getSeqId());
-        }
-        if (maintainTimeIndex) {
-            MapMethods.removeFromMultiMap(segment.timeIndexOfAddition,
-                                          timeOfAddition, cacheKey);
-        }
-    }
-
-    /**
-     * Collection of old entries is simple. Just collect in insert-time order,
-     * oldest to newest.
-     */
-    protected void collectOldOrExpiredCacheEntries(CacheSegment segment) {
-        if (cacheEntryTTL > 0) {
-            // clear expired entries
-            while (!segment.timeIndexOfAddition.isEmpty()) {
-                Long earliestTime = segment.timeIndexOfAddition.firstKey();
-                if (MathUtils.now() - earliestTime < cacheEntryTTL) {
-                    break;
-                }
-                collectCacheEntriesAtTimestamp(segment, earliestTime);
-            }
-        }
-
-        while (segment.presentSegmentSize.get() > maxSegmentSize &&
-               !segment.timeIndexOfAddition.isEmpty()) {
-            Long earliestTime = segment.timeIndexOfAddition.firstKey();
-            collectCacheEntriesAtTimestamp(segment, earliestTime);
-        }
-    }
-
-    private void collectCacheEntriesAtTimestamp(CacheSegment segment, long 
timestamp) {
-        Set<CacheKey> oldCacheEntries = 
segment.timeIndexOfAddition.get(timestamp);
-
-        // Note: only concrete cache entries, and not stubs are in the time
-        // index. Hence there can be no callbacks pending on these cache
-        // entries. Hence safe to remove them directly.
-        for (Iterator<CacheKey> iter = oldCacheEntries.iterator(); 
iter.hasNext();) {
-            final CacheKey cacheKey = iter.next();
-
-            logger.debug("Removing {} from cache because it's the oldest.", 
cacheKey);
-            removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
-                                   // maintainTimeIndex=
-                                   false,
-                                   // maintainSeqIdIndex=
-                                   true);
-        }
-
-        segment.timeIndexOfAddition.remove(timestamp);
-    }
-
-    /**
-     * ========================================================================
-     * The rest is just simple wrapper classes.
-     *
-     */
-
-    protected class ExceptionOnCacheKey implements CacheRequest {
-        CacheKey cacheKey;
-        Exception exception;
-
-        public ExceptionOnCacheKey(CacheKey cacheKey, Exception exception) {
-            this.cacheKey = cacheKey;
-            this.exception = exception;
-        }
-
-        /**
-         * If for some reason, an outstanding read on a cache stub fails,
-         * exception for that key is enqueued by the
-         * {@link ReadAheadScanCallback}. To handle this, we simply send error
-         * on the callbacks registered for that stub, and delete the entry from
-         * the cache
-         */
-        @Override
-        public void performRequest() {
-            removeMessageFromCache(cacheKey, exception,
-                                   // maintainTimeIndex=
-                                   true,
-                                   // maintainSeqIdIndex=
-                                   true);
-        }
-
-    }
-
-    @SuppressWarnings("serial")
-    protected static class NoSuchSeqIdException extends Exception {
-
-        public NoSuchSeqIdException() {
-            super("No such seq-id");
-        }
-    }
-
-    @SuppressWarnings("serial")
-    protected static class ReadAheadException extends Exception {
-        public ReadAheadException() {
-            super("Readahead failed");
-        }
-    }
-
-    public class CancelScanRequestOp implements CacheRequest {
-
-        final CancelScanRequest request;
-
-        public CancelScanRequestOp(CancelScanRequest request) {
-            this.request = request;
-        }
-
-        @Override
-        public void performRequest() {
-            // cancel scan request
-            cancelScanRequest(request.getScanRequest());
-        }
-
-        void cancelScanRequest(ScanRequest request) {
-            if (null == request) {
-                // nothing to cancel
-                return;
-            }
-
-            CacheKey cacheKey = new CacheKey(request.getTopic(), 
request.getStartSeqId());
-            CacheValue cacheValue = cache.get(cacheKey);
-            if (null == cacheValue) {
-                // cache value is evicted
-                // so it's callback would be called, we don't need to worry 
about
-                // cancel it. since it was treated as executed.
-                return;
-            }
-            cacheValue.removeCallback(request.getCallback(), request.getCtx());
-        }
-    }
-
-    public void cancelScanRequest(ByteString topic, CancelScanRequest request) 
{
-        enqueueWithoutFailureByTopic(topic, new CancelScanRequestOp(request));
-    }
-
-    protected class ScanResponse implements CacheRequest {
-        CacheKey cacheKey;
-        Message message;
-
-        public ScanResponse(CacheKey cacheKey, Message message) {
-            this.cacheKey = cacheKey;
-            this.message = message;
-        }
-
-        @Override
-        public void performRequest() {
-            addMessageToCache(cacheKey, message, MathUtils.now());
-        }
-
-    }
-
-    protected class DeliveredUntil implements CacheRequest {
-        ByteString topic;
-        Long seqId;
-
-        public DeliveredUntil(ByteString topic, Long seqId) {
-            this.topic = topic;
-            this.seqId = seqId;
-        }
-
-        @Override
-        public void performRequest() {
-            SortedSet<Long> orderedSeqIds = orderedIndexOnSeqId.get(topic);
-            if (orderedSeqIds == null) {
-                return;
-            }
-
-            // focus on the set of messages with seq-ids <= the one that
-            // has been delivered until
-            SortedSet<Long> headSet = orderedSeqIds.headSet(seqId + 1);
-
-            for (Iterator<Long> iter = headSet.iterator(); iter.hasNext();) {
-                Long seqId = iter.next();
-                CacheKey cacheKey = new CacheKey(topic, seqId);
-
-                logger.debug("Removing {} from cache because every subscriber 
has moved past",
-                    cacheKey);
-
-                removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
-                                       // maintainTimeIndex=
-                                       true,
-                                       // maintainSeqIdIndex=
-                                       false);
-                iter.remove();
-            }
-
-            if (orderedSeqIds.isEmpty()) {
-                orderedIndexOnSeqId.remove(topic);
-            }
-        }
-    }
-
-    protected class ScanRequestWrapper implements CacheRequest {
-        ScanRequest request;
-
-        public ScanRequestWrapper(ScanRequest request) {
-            this.request = request;
-        }
-
-        /**
-         * To handle a scan request, we first try to do readahead (which might
-         * cause a range read to be issued to the underlying persistence
-         * manager). The readahead will put a stub in the cache, if the message
-         * is not already present in the cache. The scan callback that is part
-         * of the scan request is added to this stub, and will be called later
-         * when the message arrives as a result of the range scan issued to the
-         * underlying persistence manager.
-         */
-
-        @Override
-        public void performRequest() {
-
-            RangeScanRequest readAheadRequest = doReadAhead(request);
-
-            // Read ahead must have installed at least a stub for us, so this
-            // can't be null
-            CacheKey cacheKey = new CacheKey(request.getTopic(), 
request.getStartSeqId());
-            CacheValue cacheValue = cache.get(cacheKey);
-            if (null == cacheValue) {
-                logger.error("Cache key {} is removed after installing stub 
when scanning.", cacheKey);
-                // reissue the request
-                scanSingleMessage(request);
-                return;
-            }
-
-            synchronized (cacheValue) {
-                // Add our callback to the stub. If the cache value was 
already a
-                // concrete message, the callback will be called right away
-                cacheValue.addCallback(request.getCallback(), 
request.getCtx());
-            }
-
-            if (readAheadRequest != null) {
-                realPersistenceManager.scanMessages(readAheadRequest);
-            }
-        }
-    }
-
-    @Override
-    public void registerJMX(HedwigMBeanInfo parent) {
-        try {
-            jmxCacheBean = new ReadAheadCacheBean(this);
-            HedwigMBeanRegistry.getInstance().register(jmxCacheBean, parent);
-        } catch (Exception e) {
-            logger.warn("Failed to register readahead cache with JMX", e);
-            jmxCacheBean = null;
-        }
-    }
-
-    @Override
-    public void unregisterJMX() {
-        try {
-            if (jmxCacheBean != null) {
-                HedwigMBeanRegistry.getInstance().unregister(jmxCacheBean);
-            }
-        } catch (Exception e) {
-            logger.warn("Failed to unregister readahead cache with JMX", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
deleted file mode 100644
index 1f43095..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.persistence;
-
-import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
-
-/**
- * Read Ahead Cache Bean
- */
-public class ReadAheadCacheBean implements ReadAheadCacheMXBean,
-        HedwigMBeanInfo {
-
-    ReadAheadCache cache;
-    public ReadAheadCacheBean(ReadAheadCache cache) {
-        this.cache = cache;
-    }
-
-    @Override
-    public String getName() {
-        return "ReadAheadCache";
-    }
-
-    @Override
-    public boolean isHidden() {
-        return false;
-    }
-
-    @Override
-    public long getMaxCacheSize() {
-        return cache.cfg.getMaximumCacheSize();
-    }
-
-    @Override
-    public long getPresentCacheSize() {
-        return cache.presentCacheSize.get();
-    }
-
-    @Override
-    public int getNumCachedEntries() {
-        return cache.cache.size();
-    }
-
-    @Override
-    public int getNumPendingCacheRequests() {
-        return cache.numPendingRequests.get();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java
deleted file mode 100644
index eba77a0..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.persistence;
-
-/**
- * Read Ahead Cache MBean
- */
-public interface ReadAheadCacheMXBean {
-
-    /**
-     * @return max cache size
-     */
-    public long getMaxCacheSize();
-
-    /**
-     * @return present cache size
-     */
-    public long getPresentCacheSize();
-
-    /**
-     * @return number of cached entries
-     */
-    public int getNumCachedEntries();
-
-    /**
-     * @return number of pending cache requests
-     */
-    public int getNumPendingCacheRequests();
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java
deleted file mode 100644
index 42ebb93..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-
-public interface ScanCallback {
-
-    enum ReasonForFinish {
-        NO_MORE_MESSAGES, SIZE_LIMIT_EXCEEDED, NUM_MESSAGES_LIMIT_EXCEEDED
-    };
-
-    /**
-     * This method is called when a message is read from the persistence layer
-     * as part of a scan. The message just read is handed to this listener 
which
-     * can then take the desired action on it. The return value from the method
-     * indicates whether the scan should continue or not.
-     *
-     * @param ctx
-     *            The context for the callback
-     * @param message
-     *            The message just scanned from the log
-     * @return true if the scan should continue, false otherwise
-     */
-    public void messageScanned(Object ctx, Message message);
-
-    /**
-     * This method is called when the scan finishes
-     *
-     *
-     * @param ctx
-     * @param reason
-     */
-
-    public abstract void scanFinished(Object ctx, ReasonForFinish reason);
-
-    /**
-     * This method is called when the operation failed due to some reason. The
-     * reason for failure is passed in.
-     *
-     * @param ctx
-     *            The context for the callback
-     * @param exception
-     *            The reason for the failure of the scan
-     */
-    public abstract void scanFailed(Object ctx, Exception exception);
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java
deleted file mode 100644
index a39a197..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-public class ScanCallbackWithContext {
-    ScanCallback scanCallback;
-    Object ctx;
-
-    public ScanCallbackWithContext(ScanCallback callback, Object ctx) {
-        this.scanCallback = callback;
-        this.ctx = ctx;
-    }
-
-    public ScanCallback getScanCallback() {
-        return scanCallback;
-    }
-
-    public Object getCtx() {
-        return ctx;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (!(other instanceof ScanCallbackWithContext)) {
-            return false;
-        }
-        ScanCallbackWithContext otherCb =
-            (ScanCallbackWithContext) other;
-        // Ensure that it was same callback & same ctx
-        return scanCallback == otherCb.scanCallback &&
-               ctx == otherCb.ctx;
-    }
-
-    @Override
-    public int hashCode() {
-        return scanCallback.hashCode();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java
deleted file mode 100644
index c985840..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-
-/**
- * Encapsulates a request for reading a single message. The message on the 
given
- * topic <b>at</b> the given seqId is scanned. A call-back {@link ScanCallback}
- * is provided. When the message is scanned, the
- * {@link ScanCallback#messageScanned(Object, Message)} method is called. Since
- * there is only 1 record to be scanned the
- * {@link ScanCallback#operationFinished(Object)} method may not be called 
since
- * its redundant.
- * {@link ScanCallback#scanFailed(Object, 
org.apache.hedwig.exceptions.PubSubException)}
- * method is called in case of error.
- *
- */
-public class ScanRequest {
-    ByteString topic;
-    long startSeqId;
-    ScanCallback callback;
-    Object ctx;
-
-    public ScanRequest(ByteString topic, long startSeqId, ScanCallback 
callback, Object ctx) {
-        this.topic = topic;
-        this.startSeqId = startSeqId;
-        this.callback = callback;
-        this.ctx = ctx;
-    }
-
-    public ByteString getTopic() {
-        return topic;
-    }
-
-    public long getStartSeqId() {
-        return startSeqId;
-    }
-
-    public ScanCallback getCallback() {
-        return callback;
-    }
-
-    public Object getCtx() {
-        return ctx;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java
deleted file mode 100644
index 10f0889..0000000
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.jboss.netty.channel.Channel;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.TopicBusyException;
-import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
-import org.apache.hedwig.util.Callback;
-
-public class ChannelTracker implements ChannelDisconnectListener {
-    HashMap<TopicSubscriber, Channel> topicSub2Channel = new 
HashMap<TopicSubscriber, Channel>();
-    HashMap<Channel, List<TopicSubscriber>> channel2TopicSubs = new 
HashMap<Channel, List<TopicSubscriber>>();
-    Subscriber subscriber;
-
-    public ChannelTracker(Subscriber subscriber) {
-        this.subscriber = subscriber;
-    }
-
-    static Callback<Void> noOpCallback = new Callback<Void>() {
-        public void operationFailed(Object ctx, PubSubException exception) {
-        };
-
-        public void operationFinished(Object ctx, Void resultOfOperation) {
-        };
-    };
-
-    public synchronized void channelDisconnected(Channel channel) {
-        List<TopicSubscriber> topicSubs = channel2TopicSubs.remove(channel);
-
-        if (topicSubs == null) {
-            return;
-        }
-
-        for (TopicSubscriber topicSub : topicSubs) {
-            topicSub2Channel.remove(topicSub);
-            subscriber.asyncCloseSubscription(topicSub.getTopic(), 
topicSub.getSubscriberId(), noOpCallback, null);
-        }
-    }
-
-    public synchronized void subscribeSucceeded(TopicSubscriber 
topicSubscriber, Channel channel)
-            throws TopicBusyException {
-
-        if (!channel.isConnected()) {
-            // channel got disconnected while we were processing the
-            // subscribe request, nothing much we can do in this case
-            return;
-        }
-
-        if (topicSub2Channel.containsKey(topicSubscriber)) {
-            TopicBusyException pse = new PubSubException.TopicBusyException(
-                "subscription for this topic, subscriberId is already being 
served on a different channel");
-            throw pse;
-        }
-
-        topicSub2Channel.put(topicSubscriber, channel);
-
-        List<TopicSubscriber> topicSubs = channel2TopicSubs.get(channel);
-
-        if (topicSubs == null) {
-            topicSubs = new LinkedList<TopicSubscriber>();
-            channel2TopicSubs.put(channel, topicSubs);
-        }
-        topicSubs.add(topicSubscriber);
-
-    }
-
-    public void aboutToCloseSubscription(ByteString topic, ByteString 
subscriberId) {
-        removeSubscriber(topic, subscriberId);
-    } 
-
-    public void aboutToUnsubscribe(ByteString topic, ByteString subscriberId) {
-        removeSubscriber(topic, subscriberId);
-    }
-
-    private synchronized void removeSubscriber(ByteString topic, ByteString 
subscriberId) {
-        TopicSubscriber topicSub = new TopicSubscriber(topic, subscriberId);
-
-        Channel channel = topicSub2Channel.remove(topicSub);
-
-        if (channel != null) {
-            List<TopicSubscriber> topicSubs = channel2TopicSubs.get(channel);
-            if (topicSubs != null) {
-                topicSubs.remove(topicSub);
-            }
-        }
-    }
-
-    public synchronized void checkChannelMatches(ByteString topic, ByteString 
subscriberId, Channel channel)
-            throws PubSubException {
-        Channel subscribedChannel = getChannel(topic, subscriberId);
-
-        if (subscribedChannel == null) {
-            throw new PubSubException.ClientNotSubscribedException(
-                "Can't start delivery since client is not subscribed");
-        }
-
-        if (subscribedChannel != channel) {
-            throw new PubSubException.TopicBusyException(
-                "Can't start delivery since client is subscribed on a 
different channel");
-        }
-
-    }
-
-    public synchronized Channel getChannel(ByteString topic, ByteString 
subscriberId) {
-        return topicSub2Channel.get(new TopicSubscriber(topic, subscriberId));
-    }
-
-}

Reply via email to