http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java deleted file mode 100644 index b3c5dc9..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java +++ /dev/null @@ -1,491 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import java.io.File; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import java.math.BigInteger; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; - -import javax.sql.rowset.serial.SerialBlob; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static com.google.common.base.Charsets.UTF_8; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protoextensions.MessageIdUtils; -import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.FileUtils; - -public class LocalDBPersistenceManager implements PersistenceManagerWithRangeScan { - private static final Logger logger = LoggerFactory.getLogger(LocalDBPersistenceManager.class); - - static String connectionURL; - - static { - try { - File tempDir = FileUtils.createTempDirectory("derby", null); - - // Since derby needs to create it, I will have to delete it first - if (!tempDir.delete()) { - throw new IOException("Could not delete dir: " + tempDir.getAbsolutePath()); - } - connectionURL = "jdbc:derby:" + tempDir.getAbsolutePath() + ";create=true"; - } catch (IOException e) { - throw new RuntimeException(e); - } - - } - - private static final ThreadLocal<Connection> threadLocalConnection = new ThreadLocal<Connection>() { - @Override - protected Connection initialValue() { - try { - return DriverManager.getConnection(connectionURL); - } catch (SQLException e) { - logger.error("Could not connect to derby", e); - return null; - } - } - }; - - private static final ThreadLocal<MessageDigest> threadLocalDigest = new ThreadLocal<MessageDigest>() { - @Override - protected MessageDigest initialValue() { - try { - return MessageDigest.getInstance("MD5"); - } catch (NoSuchAlgorithmException e) { - logger.error("Could not find MD5 hash", e); - return null; - } - } - }; - static final String ID_FIELD_NAME = "id"; - static final String MSG_FIELD_NAME = "msg"; - static final String driver = "org.apache.derby.jdbc.EmbeddedDriver"; - - static final int SCAN_CHUNK = 1000; - - /** - * Having trouble restarting the database multiple times from within the - * same jvm. Hence to facilitate units tests, we are just going to have a - * version number that we will append to every table name. This version - * number will be incremented in lieu of shutting down the database and - * restarting it, so that we get different table names, and it behaves like - * a brand new database - */ - private int version = 0; - - ConcurrentMap<ByteString, MessageSeqId> currTopicSeqIds = new ConcurrentHashMap<ByteString, MessageSeqId>(); - - static LocalDBPersistenceManager instance = new LocalDBPersistenceManager(); - - public static LocalDBPersistenceManager instance() { - return instance; - } - - private LocalDBPersistenceManager() { - - try { - Class.forName(driver).newInstance(); - logger.info("Derby Driver loaded"); - } catch (java.lang.ClassNotFoundException e) { - logger.error("Derby driver not found", e); - } catch (InstantiationException e) { - logger.error("Could not instantiate derby driver", e); - } catch (IllegalAccessException e) { - logger.error("Could not instantiate derby driver", e); - } - } - - @Override - public void stop() { - // do nothing - } - - /** - * Ensures that at least the default seq-id exists in the map for the given - * topic. Checks for race conditions (.e.g, another thread inserts the - * default id before us), and returns the latest seq-id value in the map - * - * @param topic - * @return - */ - private MessageSeqId ensureSeqIdExistsForTopic(ByteString topic) { - MessageSeqId presentSeqIdInMap = currTopicSeqIds.get(topic); - - if (presentSeqIdInMap != null) { - return presentSeqIdInMap; - } - - presentSeqIdInMap = MessageSeqId.newBuilder().setLocalComponent(0).build(); - MessageSeqId oldSeqIdInMap = currTopicSeqIds.putIfAbsent(topic, presentSeqIdInMap); - - if (oldSeqIdInMap != null) { - return oldSeqIdInMap; - } - return presentSeqIdInMap; - - } - - /** - * Adjust the current seq id of the topic based on the message we are about - * to publish. The local component of the current seq-id is always - * incremented by 1. For the other components, there are two cases: - * - * 1. If the message to be published doesn't have a seq-id (locally - * published messages), the other components are left as is. - * - * 2. If the message to be published has a seq-id, we take the max of the - * current one we have, and that in the message to be published. - * - * @param topic - * @param messageToPublish - * @return The value of the local seq-id obtained after incrementing the - * local component. This value should be used as an id while - * persisting to Derby - * @throws UnexpectedConditionException - */ - private long adjustTopicSeqIdForPublish(ByteString topic, Message messageToPublish) - throws UnexpectedConditionException { - long retValue = 0; - MessageSeqId oldId; - MessageSeqId.Builder newIdBuilder = MessageSeqId.newBuilder(); - - do { - oldId = ensureSeqIdExistsForTopic(topic); - - // Increment our own component by 1 - retValue = oldId.getLocalComponent() + 1; - newIdBuilder.setLocalComponent(retValue); - - if (messageToPublish.hasMsgId()) { - // take a region-wise max - MessageIdUtils.takeRegionMaximum(newIdBuilder, messageToPublish.getMsgId(), oldId); - - } else { - newIdBuilder.addAllRemoteComponents(oldId.getRemoteComponentsList()); - } - } while (!currTopicSeqIds.replace(topic, oldId, newIdBuilder.build())); - - return retValue; - - } - - public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) { - return seqId + skipAmount; - } - - public void persistMessage(PersistRequest request) { - - Connection conn = threadLocalConnection.get(); - - Callback<MessageSeqId> callback = request.getCallback(); - Object ctx = request.getCtx(); - ByteString topic = request.getTopic(); - Message message = request.getMessage(); - - if (conn == null) { - callback.operationFailed(ctx, new ServiceDownException("Not connected to derby")); - return; - } - - long seqId; - - try { - seqId = adjustTopicSeqIdForPublish(topic, message); - } catch (UnexpectedConditionException e) { - callback.operationFailed(ctx, e); - return; - } - PreparedStatement stmt; - - boolean triedCreatingTable = false; - while (true) { - try { - message.getBody(); - stmt = conn.prepareStatement("INSERT INTO " + getTableNameForTopic(topic) + " VALUES(?,?)"); - stmt.setLong(1, seqId); - stmt.setBlob(2, new SerialBlob(message.toByteArray())); - - int rowCount = stmt.executeUpdate(); - stmt.close(); - if (rowCount != 1) { - logger.error("Unexpected number of affected rows from derby"); - callback.operationFailed(ctx, new ServiceDownException("Unexpected response from derby")); - return; - } - break; - } catch (SQLException sqle) { - String theError = (sqle).getSQLState(); - if (theError.equals("42X05") && !triedCreatingTable) { - createTable(conn, topic); - triedCreatingTable = true; - continue; - } - - logger.error("Error while executing derby insert", sqle); - callback.operationFailed(ctx, new ServiceDownException(sqle)); - return; - } - } - callback.operationFinished(ctx, MessageIdUtils.mergeLocalSeqId(message, seqId).getMsgId()); - } - - /* - * This method does not throw an exception because another thread might - * sneak in and create the table before us - */ - private void createTable(Connection conn, ByteString topic) { - Statement stmt = null; - try { - stmt = conn.createStatement(); - String tableName = getTableNameForTopic(topic); - stmt.execute("CREATE TABLE " + tableName + " (" + ID_FIELD_NAME + " BIGINT NOT NULL CONSTRAINT ID_PK_" - + tableName + " PRIMARY KEY," + MSG_FIELD_NAME + " BLOB(2M) NOT NULL)"); - } catch (SQLException e) { - logger.debug("Could not create table", e); - } finally { - try { - if (stmt != null) { - stmt.close(); - } - } catch (SQLException e) { - logger.error("Error closing statement", e); - } - } - } - - public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) { - return ensureSeqIdExistsForTopic(topic); - } - - public void scanSingleMessage(ScanRequest request) { - scanMessagesInternal(request.getTopic(), request.getStartSeqId(), 1, Long.MAX_VALUE, request.getCallback(), - request.getCtx(), 1); - return; - } - - public void scanMessages(RangeScanRequest request) { - scanMessagesInternal(request.getTopic(), request.getStartSeqId(), request.getMessageLimit(), request - .getSizeLimit(), request.getCallback(), request.getCtx(), SCAN_CHUNK); - return; - } - - private String getTableNameForTopic(ByteString topic) { - String src = (topic.toStringUtf8() + "_" + version); - threadLocalDigest.get().reset(); - byte[] digest = threadLocalDigest.get().digest(src.getBytes(UTF_8)); - BigInteger bigInt = new BigInteger(1,digest); - return String.format("TABLE_%032X", bigInt); - } - - private void scanMessagesInternal(ByteString topic, long startSeqId, int messageLimit, long sizeLimit, - ScanCallback callback, Object ctx, int scanChunk) { - - Connection conn = threadLocalConnection.get(); - - if (conn == null) { - callback.scanFailed(ctx, new ServiceDownException("Not connected to derby")); - return; - } - - long currentSeqId; - currentSeqId = startSeqId; - - PreparedStatement stmt = null; - try { - try { - stmt = conn.prepareStatement("SELECT * FROM " + getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME - + " >= ? AND " + ID_FIELD_NAME + " <= ?"); - - } catch (SQLException sqle) { - String theError = (sqle).getSQLState(); - if (theError.equals("42X05")) { - // No table, scan is over - callback.scanFinished(ctx, ReasonForFinish.NO_MORE_MESSAGES); - return; - } else { - throw sqle; - } - } - - int numMessages = 0; - long totalSize = 0; - - while (true) { - - stmt.setLong(1, currentSeqId); - stmt.setLong(2, currentSeqId + scanChunk); - - if (!stmt.execute()) { - String errorMsg = "Select query did not return a result set"; - logger.error(errorMsg); - stmt.close(); - callback.scanFailed(ctx, new ServiceDownException(errorMsg)); - return; - } - - ResultSet resultSet = stmt.getResultSet(); - - if (!resultSet.next()) { - stmt.close(); - callback.scanFinished(ctx, ReasonForFinish.NO_MORE_MESSAGES); - return; - } - - do { - - long localSeqId = resultSet.getLong(1); - - Message.Builder messageBuilder = Message.newBuilder().mergeFrom(resultSet.getBinaryStream(2)); - - // Merge in the local seq-id since that is not stored with - // the message - Message message = MessageIdUtils.mergeLocalSeqId(messageBuilder, localSeqId); - - callback.messageScanned(ctx, message); - numMessages++; - totalSize += message.getBody().size(); - - if (numMessages > messageLimit) { - stmt.close(); - callback.scanFinished(ctx, ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED); - return; - } else if (totalSize > sizeLimit) { - stmt.close(); - callback.scanFinished(ctx, ReasonForFinish.SIZE_LIMIT_EXCEEDED); - return; - } - - } while (resultSet.next()); - - currentSeqId += SCAN_CHUNK; - } - } catch (SQLException e) { - logger.error("SQL Exception", e); - callback.scanFailed(ctx, new ServiceDownException(e)); - return; - } catch (IOException e) { - logger.error("Message stored in derby is not parseable", e); - callback.scanFailed(ctx, new ServiceDownException(e)); - return; - } finally { - try { - if (stmt != null) { - stmt.close(); - } - } catch (SQLException e) { - logger.error("Error closing statement", e); - } - } - } - - public void deliveredUntil(ByteString topic, Long seqId) { - // noop - } - - public void consumedUntil(ByteString topic, Long seqId) { - Connection conn = threadLocalConnection.get(); - if (conn == null) { - logger.error("Not connected to derby"); - return; - } - PreparedStatement stmt = null; - try { - stmt = conn.prepareStatement("DELETE FROM " + getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME - + " <= ?"); - stmt.setLong(1, seqId); - int rowCount = stmt.executeUpdate(); - if (logger.isDebugEnabled()) { - logger.debug("Deleted " + rowCount + " records for topic: " + topic.toStringUtf8() - + ", seqId: " + seqId); - } - } catch (SQLException sqle) { - String theError = (sqle).getSQLState(); - if (theError.equals("42X05")) { - logger.warn("Table for topic (" + topic + ") does not exist so no consumed messages to delete!"); - } else - logger.error("Error while executing derby delete for consumed messages", sqle); - } finally { - try { - if (stmt != null) { - stmt.close(); - } - } catch (SQLException e) { - logger.error("Error closing statement", e); - } - } - } - - public void setMessageBound(ByteString topic, Integer bound) { - // noop; Maybe implement later - } - - public void clearMessageBound(ByteString topic) { - // noop; Maybe implement later - } - - public void consumeToBound(ByteString topic) { - // noop; Maybe implement later - } - - @Override - protected void finalize() throws Throwable { - if (driver.equals("org.apache.derby.jdbc.EmbeddedDriver")) { - boolean gotSQLExc = false; - // This is weird: on normal shutdown, it throws an exception - try { - DriverManager.getConnection("jdbc:derby:;shutdown=true").close(); - } catch (SQLException se) { - if (se.getSQLState().equals("XJ015")) { - gotSQLExc = true; - } - } - if (!gotSQLExc) { - logger.error("Database did not shut down normally"); - } else { - logger.info("Database shut down normally"); - } - } - super.finalize(); - } - - public void reset() { - // just move the namespace over to the next one - version++; - currTopicSeqIds.clear(); - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java deleted file mode 100644 index f640723..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/MapMethods.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import java.util.Collection; -import java.util.Map; - -public class MapMethods { - - public static <K, V> V getAfterInsertingIfAbsent(Map<K, V> map, K key, Factory<V> valueFactory) { - V value = map.get(key); - - if (value == null) { - value = valueFactory.newInstance(); - map.put(key, value); - } - - return value; - } - - public static <K, V, Z extends Collection<V>> void addToMultiMap(Map<K, Z> map, K key, V value, - Factory<Z> valueFactory) { - Collection<V> collection = getAfterInsertingIfAbsent(map, key, valueFactory); - - collection.add(value); - - } - - public static <K, V, Z extends Collection<V>> boolean removeFromMultiMap(Map<K, Z> map, K key, V value) { - Collection<V> collection = map.get(key); - - if (collection == null) { - return false; - } - - if (!collection.remove(value)) { - return false; - } else { - if (collection.isEmpty()) { - map.remove(key); - } - return true; - } - - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java deleted file mode 100644 index d137fe6..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistRequest.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.util.Callback; - -/** - * Encapsulates a request to persist a given message on a given topic. The - * request is completed asynchronously, callback and context are provided - * - */ -public class PersistRequest { - ByteString topic; - Message message; - private Callback<PubSubProtocol.MessageSeqId> callback; - Object ctx; - - public PersistRequest(ByteString topic, Message message, Callback<PubSubProtocol.MessageSeqId> callback, Object ctx) { - this.topic = topic; - this.message = message; - this.callback = callback; - this.ctx = ctx; - } - - public ByteString getTopic() { - return topic; - } - - public Message getMessage() { - return message; - } - - public Callback<PubSubProtocol.MessageSeqId> getCallback() { - return callback; - } - - public Object getCtx() { - return ctx; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java deleted file mode 100644 index a295fc7..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; - -/** - * An implementation of this interface will persist messages in order and assign - * a seqId to each persisted message. SeqId need not be a single number in - * general. SeqId is opaque to all layers above {@link PersistenceManager}. Only - * the {@link PersistenceManager} needs to understand the format of the seqId - * and maintain it in such a way that there is a total order on the seqIds of a - * topic. - * - */ -public interface PersistenceManager { - - /** - * Executes the given persist request asynchronously. When done, the - * callback specified in the request object is called with the result of the - * operation set to the {@link LocalMessageSeqId} assigned to the persisted - * message. - */ - public void persistMessage(PersistRequest request); - - /** - * Get the seqId of the last message that has been persisted to the given - * topic. The returned seqId will be set as the consume position of any - * brand new subscription on this topic. - * - * Note that the return value may quickly become invalid because a - * {@link #persistMessage(String, PublishedMessage)} call from another - * thread succeeds. For us, the typical use case is choosing the consume - * position of a new subscriber. Since the subscriber need not receive all - * messages that are published while the subscribe call is in progress, such - * loose semantics from this method is acceptable. - * - * @param topic - * @return the seqId of the last persisted message. - * @throws ServerNotResponsibleForTopicException - */ - public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException; - - /** - * Executes the given scan request - * - */ - public void scanSingleMessage(ScanRequest request); - - /** - * Gets the next seq-id. This method should never block. - */ - public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount); - - /** - * Hint that the messages until the given seqId have been delivered and wont - * be needed unless there is a failure of some kind - */ - public void deliveredUntil(ByteString topic, Long seqId); - - /** - * Hint that the messages until the given seqId have been consumed by all - * subscribers to the topic and no longer need to be stored. The - * implementation classes can decide how and if they want to garbage collect - * and delete these older topic messages that are no longer needed. - * - * @param topic - * Topic - * @param seqId - * Message local sequence ID - */ - public void consumedUntil(ByteString topic, Long seqId); - - public void setMessageBound(ByteString topic, Integer bound); - public void clearMessageBound(ByteString topic); - public void consumeToBound(ByteString topic); - - /** - * Stop persistence manager. - */ - public void stop(); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java deleted file mode 100644 index f12174f..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -public interface PersistenceManagerWithRangeScan extends PersistenceManager { - /** - * Executes the given range scan request - * - * @param request - */ - public void scanMessages(RangeScanRequest request); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java deleted file mode 100644 index 3ac324d..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import com.google.protobuf.ByteString; - -/** - * Encapsulates a request to scan messages on the given topic starting from the - * given seqId (included). A call-back {@link ScanCallback} is provided. As - * messages are scanned, the relevant methods of the {@link ScanCallback} are - * called. Two hints are provided as to when scanning should stop: in terms of - * number of messages scanned, or in terms of the total size of messages - * scanned. Scanning stops whenever one of these limits is exceeded. These - * checks, especially the one about message size, are only approximate. The - * {@link ScanCallback} used should be prepared to deal with more or less - * messages scanned. If an error occurs during scanning, the - * {@link ScanCallback} is notified of the error. - * - */ -public class RangeScanRequest { - ByteString topic; - long startSeqId; - int messageLimit; - long sizeLimit; - ScanCallback callback; - Object ctx; - - public RangeScanRequest(ByteString topic, long startSeqId, int messageLimit, long sizeLimit, ScanCallback callback, - Object ctx) { - this.topic = topic; - this.startSeqId = startSeqId; - this.messageLimit = messageLimit; - this.sizeLimit = sizeLimit; - this.callback = callback; - this.ctx = ctx; - } - - public ByteString getTopic() { - return topic; - } - - public long getStartSeqId() { - return startSeqId; - } - - public int getMessageLimit() { - return messageLimit; - } - - public long getSizeLimit() { - return sizeLimit; - } - - public ScanCallback getCallback() { - return callback; - } - - public Object getCtx() { - return ctx; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java deleted file mode 100644 index 48be3e8..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java +++ /dev/null @@ -1,865 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Set; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.bookkeeper.util.MathUtils; -import org.apache.bookkeeper.util.OrderedSafeExecutor; -import org.apache.bookkeeper.util.SafeRunnable; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protoextensions.MessageIdUtils; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.jmx.HedwigJMXService; -import org.apache.hedwig.server.jmx.HedwigMBeanInfo; -import org.apache.hedwig.server.jmx.HedwigMBeanRegistry; -import org.apache.hedwig.util.Callback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; - -public class ReadAheadCache implements PersistenceManager, HedwigJMXService { - - private static final Logger logger = LoggerFactory.getLogger(ReadAheadCache.class); - - protected interface CacheRequest { - public void performRequest(); - } - - /** - * The underlying persistence manager that will be used for persistence and - * scanning below the cache - */ - protected PersistenceManagerWithRangeScan realPersistenceManager; - - /** - * The structure for the cache - */ - protected ConcurrentMap<CacheKey, CacheValue> cache = - new ConcurrentHashMap<CacheKey, CacheValue>(); - - /** - * We also want to track the entries in seq-id order so that we can clean up - * entries after the last subscriber - */ - protected ConcurrentMap<ByteString, SortedSet<Long>> orderedIndexOnSeqId = - new ConcurrentHashMap<ByteString, SortedSet<Long>>(); - - /** - * Partition Cache into Serveral Segments for simplify synchronization. - * Each segment maintains its time index and segment size. - */ - static class CacheSegment { - - /** - * We want to keep track of when entries were added in the cache, so that we - * can remove them in a FIFO fashion - */ - protected SortedMap<Long, Set<CacheKey>> timeIndexOfAddition = new TreeMap<Long, Set<CacheKey>>(); - - /** - * We maintain an estimate of the current size of each cache segment, - * so that the thread know when to evict entries from cache segment. - */ - protected AtomicLong presentSegmentSize = new AtomicLong(0); - - } - - /** - * We maintain an estimate of the current size of the cache, so that we know - * when to evict entries. - */ - protected AtomicLong presentCacheSize = new AtomicLong(0); - - /** - * Num pending requests. - */ - protected AtomicInteger numPendingRequests = new AtomicInteger(0); - - /** - * Cache segment for different threads - */ - protected final ThreadLocal<CacheSegment> cacheSegment = - new ThreadLocal<CacheSegment>() { - @Override - protected CacheSegment initialValue() { - return new CacheSegment(); - } - }; - - /** - * One instance of a callback that we will pass to the underlying - * persistence manager when asking it to persist messages - */ - protected PersistCallback persistCallbackInstance = new PersistCallback(); - - /** - * 2 kinds of exceptions that we will use to signal error from readahead - */ - protected NoSuchSeqIdException noSuchSeqIdExceptionInstance = new NoSuchSeqIdException(); - protected ReadAheadException readAheadExceptionInstance = new ReadAheadException(); - - protected ServerConfiguration cfg; - // Boolean indicating if this thread should continue running. This is used - // when we want to stop the thread during a PubSubServer shutdown. - protected volatile boolean keepRunning = true; - - protected final OrderedSafeExecutor cacheWorkers; - protected final int numCacheWorkers; - protected volatile long maxSegmentSize; - protected volatile long cacheEntryTTL; - - // JMX Beans - ReadAheadCacheBean jmxCacheBean = null; - - /** - * Constructor. Starts the cache maintainer thread - * - * @param realPersistenceManager - */ - public ReadAheadCache(PersistenceManagerWithRangeScan realPersistenceManager, ServerConfiguration cfg) { - this.realPersistenceManager = realPersistenceManager; - this.cfg = cfg; - numCacheWorkers = cfg.getNumReadAheadCacheThreads(); - cacheWorkers = OrderedSafeExecutor.newBuilder() - .name("ReadAheadCacheScheduler") - .numThreads(numCacheWorkers) - .build(); - reloadConf(cfg); - } - - /** - * Reload configuration - * - * @param conf - * Server configuration object - */ - protected void reloadConf(ServerConfiguration cfg) { - maxSegmentSize = cfg.getMaximumCacheSize() / numCacheWorkers; - cacheEntryTTL = cfg.getCacheEntryTTL(); - } - - public ReadAheadCache start() { - return this; - } - - /** - * ======================================================================== - * Methods of {@link PersistenceManager} that we will pass straight down to - * the real persistence manager. - */ - - @Override - public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) { - return realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, skipAmount); - } - - @Override - public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException { - return realPersistenceManager.getCurrentSeqIdForTopic(topic); - } - - /** - * ======================================================================== - * Other methods of {@link PersistenceManager} that the cache needs to take - * some action on. - * - * 1. Persist: We pass it through to the real persistence manager but insert - * our callback on the return path - * - */ - @Override - public void persistMessage(PersistRequest request) { - // make a new PersistRequest object so that we can insert our own - // callback in the middle. Assign the original request as the context - // for the callback. - - PersistRequest newRequest = new PersistRequest(request.getTopic(), request.getMessage(), - persistCallbackInstance, request); - realPersistenceManager.persistMessage(newRequest); - } - - /** - * The callback that we insert on the persist request return path. The - * callback simply forms a {@link PersistResponse} object and inserts it in - * the request queue to be handled serially by the cache maintainer thread. - * - */ - public class PersistCallback implements Callback<PubSubProtocol.MessageSeqId> { - - /** - * In case there is a failure in persisting, just pass it to the - * original callback - */ - @Override - public void operationFailed(Object ctx, PubSubException exception) { - PersistRequest originalRequest = (PersistRequest) ctx; - Callback<PubSubProtocol.MessageSeqId> originalCallback = originalRequest.getCallback(); - Object originalContext = originalRequest.getCtx(); - originalCallback.operationFailed(originalContext, exception); - } - - /** - * When the persist finishes, we first notify the original callback of - * success, and then opportunistically treat the message as if it just - * came in through a scan - */ - @Override - public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) { - PersistRequest originalRequest = (PersistRequest) ctx; - - // Lets call the original callback first so that the publisher can - // hear success - originalRequest.getCallback().operationFinished(originalRequest.getCtx(), resultOfOperation); - - // Original message that was persisted didn't have the local seq-id. - // Lets add that in - Message messageWithLocalSeqId = MessageIdUtils.mergeLocalSeqId(originalRequest.getMessage(), - resultOfOperation.getLocalComponent()); - - // Now enqueue a request to add this newly persisted message to our - // cache - CacheKey cacheKey = new CacheKey(originalRequest.getTopic(), resultOfOperation.getLocalComponent()); - - enqueueWithoutFailureByTopic(cacheKey.getTopic(), - new ScanResponse(cacheKey, messageWithLocalSeqId)); - } - - } - - protected void enqueueWithoutFailureByTopic(ByteString topic, final CacheRequest obj) { - if (!keepRunning) { - return; - } - try { - numPendingRequests.incrementAndGet(); - cacheWorkers.submitOrdered(topic, new SafeRunnable() { - @Override - public void safeRun() { - numPendingRequests.decrementAndGet(); - obj.performRequest(); - } - }); - } catch (RejectedExecutionException ree) { - logger.error("Failed to submit cache request for topic " + topic.toStringUtf8() + " : ", ree); - } - } - - /** - * Another method from {@link PersistenceManager}. - * - * 2. Scan - Since the scan needs to touch the cache, we will just enqueue - * the scan request and let the cache maintainer thread handle it. - */ - @Override - public void scanSingleMessage(ScanRequest request) { - // Let the scan requests be serialized through the queue - enqueueWithoutFailureByTopic(request.getTopic(), - new ScanRequestWrapper(request)); - } - - /** - * Another method from {@link PersistenceManager}. - * - * 3. Enqueue the request so that the cache maintainer thread can delete all - * message-ids older than the one specified - */ - @Override - public void deliveredUntil(ByteString topic, Long seqId) { - enqueueWithoutFailureByTopic(topic, new DeliveredUntil(topic, seqId)); - } - - /** - * Another method from {@link PersistenceManager}. - * - * Since this is a cache layer on top of an underlying persistence manager, - * we can just call the consumedUntil method there. The messages older than - * the latest one passed here won't be accessed anymore so they should just - * get aged out of the cache eventually. For now, there is no need to - * proactively remove those entries from the cache. - */ - @Override - public void consumedUntil(ByteString topic, Long seqId) { - realPersistenceManager.consumedUntil(topic, seqId); - } - - @Override - public void setMessageBound(ByteString topic, Integer bound) { - realPersistenceManager.setMessageBound(topic, bound); - } - - @Override - public void clearMessageBound(ByteString topic) { - realPersistenceManager.clearMessageBound(topic); - } - - @Override - public void consumeToBound(ByteString topic) { - realPersistenceManager.consumeToBound(topic); - } - - /** - * Stop the readahead cache. - */ - @Override - public void stop() { - try { - keepRunning = false; - cacheWorkers.shutdown(); - } catch (Exception e) { - logger.warn("Failed to shut down cache workers : ", e); - } - } - - /** - * The readahead policy is simple: We check if an entry already exists for - * the message being requested. If an entry exists, it means that either - * that message is already in the cache, or a read for that message is - * outstanding. In that case, we look a little ahead (by readAheadCount/2) - * and issue a range read of readAheadCount/2 messages. The idea is to - * ensure that the next readAheadCount messages are always available. - * - * @return the range scan that should be issued for read ahead - */ - protected RangeScanRequest doReadAhead(ScanRequest request) { - ByteString topic = request.getTopic(); - Long seqId = request.getStartSeqId(); - - int readAheadCount = cfg.getReadAheadCount(); - // To prevent us from getting screwed by bad configuration - readAheadCount = Math.max(1, readAheadCount); - - RangeScanRequest readAheadRequest = doReadAheadStartingFrom(topic, seqId, readAheadCount); - - if (readAheadRequest != null) { - return readAheadRequest; - } - - // start key was already there in the cache so no readahead happened, - // lets look a little beyond - seqId = realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, readAheadCount / 2); - - readAheadRequest = doReadAheadStartingFrom(topic, seqId, readAheadCount / 2); - - return readAheadRequest; - } - - /** - * This method just checks if the provided seq-id already exists in the - * cache. If not, a range read of the specified amount is issued. - * - * @param topic - * @param seqId - * @param readAheadCount - * @return The range read that should be issued - */ - protected RangeScanRequest doReadAheadStartingFrom(ByteString topic, long seqId, int readAheadCount) { - - long startSeqId = seqId; - Queue<CacheKey> installedStubs = new LinkedList<CacheKey>(); - - int i = 0; - - for (; i < readAheadCount; i++) { - CacheKey cacheKey = new CacheKey(topic, seqId); - - // Even if a stub exists, it means that a scan for that is - // outstanding - if (cache.containsKey(cacheKey)) { - break; - } - CacheValue cacheValue = new CacheValue(); - if (null != cache.putIfAbsent(cacheKey, cacheValue)) { - logger.warn("It is unexpected that more than one threads are adding message to cache key {}" - +" at the same time.", cacheKey); - } - - logger.debug("Adding cache stub for: {}", cacheKey); - installedStubs.add(cacheKey); - - seqId = realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, 1); - } - - // so how many did we decide to readahead - if (i == 0) { - // no readahead, hence return false - return null; - } - - long readAheadSizeLimit = cfg.getReadAheadSizeBytes(); - ReadAheadScanCallback callback = new ReadAheadScanCallback(installedStubs, topic); - RangeScanRequest rangeScanRequest = new RangeScanRequest(topic, startSeqId, i, readAheadSizeLimit, callback, - null); - - return rangeScanRequest; - - } - - /** - * This is the callback that is used for the range scans. - */ - protected class ReadAheadScanCallback implements ScanCallback { - Queue<CacheKey> installedStubs; - ByteString topic; - - /** - * Constructor - * - * @param installedStubs - * The list of stubs that were installed for this range scan - * @param topic - */ - public ReadAheadScanCallback(Queue<CacheKey> installedStubs, ByteString topic) { - this.installedStubs = installedStubs; - this.topic = topic; - } - - @Override - public void messageScanned(Object ctx, Message message) { - - // Any message we read is potentially useful for us, so lets first - // enqueue it - CacheKey cacheKey = new CacheKey(topic, message.getMsgId().getLocalComponent()); - enqueueWithoutFailureByTopic(topic, new ScanResponse(cacheKey, message)); - - // Now lets see if this message is the one we were expecting - CacheKey expectedKey = installedStubs.peek(); - - if (expectedKey == null) { - // Was not expecting any more messages to come in, but they came - // in so we will keep them - return; - } - - if (expectedKey.equals(cacheKey)) { - // what we got is what we expected, dequeue it so we get the - // next expected one - installedStubs.poll(); - return; - } - - // If reached here, what we scanned was not what we were expecting. - // This means that we have wrong stubs installed in the cache. We - // should remove them, so that whoever is waiting on them can retry. - // This shouldn't be happening usually - logger.warn("Unexpected message seq-id: " + message.getMsgId().getLocalComponent() + " on topic: " - + topic.toStringUtf8() + " from readahead scan, was expecting seq-id: " + expectedKey.seqId - + " topic: " + expectedKey.topic.toStringUtf8() + " installedStubs: " + installedStubs); - enqueueDeleteOfRemainingStubs(noSuchSeqIdExceptionInstance); - - } - - @Override - public void scanFailed(Object ctx, Exception exception) { - enqueueDeleteOfRemainingStubs(exception); - } - - @Override - public void scanFinished(Object ctx, ReasonForFinish reason) { - // If the scan finished because no more messages are present, its ok - // to leave the stubs in place because they will get filled in as - // new publishes happen. However, if the scan finished due to some - // other reason, e.g., read ahead size limit was reached, we want to - // delete the stubs, so that when the time comes, we can schedule - // another readahead request. - if (reason != ReasonForFinish.NO_MORE_MESSAGES) { - enqueueDeleteOfRemainingStubs(readAheadExceptionInstance); - } - } - - private void enqueueDeleteOfRemainingStubs(Exception reason) { - CacheKey installedStub; - while ((installedStub = installedStubs.poll()) != null) { - enqueueWithoutFailureByTopic(installedStub.getTopic(), - new ExceptionOnCacheKey(installedStub, reason)); - } - } - } - - protected static class HashSetCacheKeyFactory implements Factory<Set<CacheKey>> { - protected final static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory(); - - @Override - public Set<CacheKey> newInstance() { - return new HashSet<CacheKey>(); - } - } - - protected static class TreeSetLongFactory implements Factory<SortedSet<Long>> { - protected final static TreeSetLongFactory instance = new TreeSetLongFactory(); - - @Override - public SortedSet<Long> newInstance() { - return new TreeSet<Long>(); - } - } - - /** - * For adding the message to the cache, we do some bookeeping such as the - * total size of cache, order in which entries were added etc. If the size - * of the cache has exceeded our budget, old entries are collected. - * - * @param cacheKey - * @param message - */ - protected void addMessageToCache(final CacheKey cacheKey, - final Message message, final long currTime) { - logger.debug("Adding msg {} to readahead cache", cacheKey); - - CacheValue cacheValue; - if ((cacheValue = cache.get(cacheKey)) == null) { - cacheValue = new CacheValue(); - CacheValue oldValue = cache.putIfAbsent(cacheKey, cacheValue); - if (null != oldValue) { - logger.warn("Weird! Should not have two threads adding message to cache key {} at the same time.", - cacheKey); - cacheValue = oldValue; - } - } - - CacheSegment segment = cacheSegment.get(); - if (cacheValue.isStub()) { // update cache size only when cache value is a stub - int size = message.getBody().size(); - - // update the cache size - segment.presentSegmentSize.addAndGet(size); - presentCacheSize.addAndGet(size); - } - - synchronized (cacheValue) { - // finally add the message to the cache - cacheValue.setMessageAndInvokeCallbacks(message, currTime); - } - - // maintain the index of seq-id - // no lock since threads are partitioned by topics - MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(), - cacheKey.getSeqId(), TreeSetLongFactory.instance); - - // maintain the time index of addition - MapMethods.addToMultiMap(segment.timeIndexOfAddition, currTime, - cacheKey, HashSetCacheKeyFactory.instance); - - collectOldOrExpiredCacheEntries(segment); - } - - protected void removeMessageFromCache(final CacheKey cacheKey, Exception exception, - final boolean maintainTimeIndex, - final boolean maintainSeqIdIndex) { - CacheValue cacheValue = cache.remove(cacheKey); - - if (cacheValue == null) { - return; - } - - CacheSegment segment = cacheSegment.get(); - - long timeOfAddition = 0; - synchronized (cacheValue) { - if (cacheValue.isStub()) { - cacheValue.setErrorAndInvokeCallbacks(exception); - // Stubs are not present in the indexes, so don't need to maintain - // indexes here - return; - } - - int size = 0 - cacheValue.getMessage().getBody().size(); - presentCacheSize.addAndGet(size); - segment.presentSegmentSize.addAndGet(size); - timeOfAddition = cacheValue.getTimeOfAddition(); - } - - if (maintainSeqIdIndex) { - MapMethods.removeFromMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(), - cacheKey.getSeqId()); - } - if (maintainTimeIndex) { - MapMethods.removeFromMultiMap(segment.timeIndexOfAddition, - timeOfAddition, cacheKey); - } - } - - /** - * Collection of old entries is simple. Just collect in insert-time order, - * oldest to newest. - */ - protected void collectOldOrExpiredCacheEntries(CacheSegment segment) { - if (cacheEntryTTL > 0) { - // clear expired entries - while (!segment.timeIndexOfAddition.isEmpty()) { - Long earliestTime = segment.timeIndexOfAddition.firstKey(); - if (MathUtils.now() - earliestTime < cacheEntryTTL) { - break; - } - collectCacheEntriesAtTimestamp(segment, earliestTime); - } - } - - while (segment.presentSegmentSize.get() > maxSegmentSize && - !segment.timeIndexOfAddition.isEmpty()) { - Long earliestTime = segment.timeIndexOfAddition.firstKey(); - collectCacheEntriesAtTimestamp(segment, earliestTime); - } - } - - private void collectCacheEntriesAtTimestamp(CacheSegment segment, long timestamp) { - Set<CacheKey> oldCacheEntries = segment.timeIndexOfAddition.get(timestamp); - - // Note: only concrete cache entries, and not stubs are in the time - // index. Hence there can be no callbacks pending on these cache - // entries. Hence safe to remove them directly. - for (Iterator<CacheKey> iter = oldCacheEntries.iterator(); iter.hasNext();) { - final CacheKey cacheKey = iter.next(); - - logger.debug("Removing {} from cache because it's the oldest.", cacheKey); - removeMessageFromCache(cacheKey, readAheadExceptionInstance, // - // maintainTimeIndex= - false, - // maintainSeqIdIndex= - true); - } - - segment.timeIndexOfAddition.remove(timestamp); - } - - /** - * ======================================================================== - * The rest is just simple wrapper classes. - * - */ - - protected class ExceptionOnCacheKey implements CacheRequest { - CacheKey cacheKey; - Exception exception; - - public ExceptionOnCacheKey(CacheKey cacheKey, Exception exception) { - this.cacheKey = cacheKey; - this.exception = exception; - } - - /** - * If for some reason, an outstanding read on a cache stub fails, - * exception for that key is enqueued by the - * {@link ReadAheadScanCallback}. To handle this, we simply send error - * on the callbacks registered for that stub, and delete the entry from - * the cache - */ - @Override - public void performRequest() { - removeMessageFromCache(cacheKey, exception, - // maintainTimeIndex= - true, - // maintainSeqIdIndex= - true); - } - - } - - @SuppressWarnings("serial") - protected static class NoSuchSeqIdException extends Exception { - - public NoSuchSeqIdException() { - super("No such seq-id"); - } - } - - @SuppressWarnings("serial") - protected static class ReadAheadException extends Exception { - public ReadAheadException() { - super("Readahead failed"); - } - } - - public class CancelScanRequestOp implements CacheRequest { - - final CancelScanRequest request; - - public CancelScanRequestOp(CancelScanRequest request) { - this.request = request; - } - - @Override - public void performRequest() { - // cancel scan request - cancelScanRequest(request.getScanRequest()); - } - - void cancelScanRequest(ScanRequest request) { - if (null == request) { - // nothing to cancel - return; - } - - CacheKey cacheKey = new CacheKey(request.getTopic(), request.getStartSeqId()); - CacheValue cacheValue = cache.get(cacheKey); - if (null == cacheValue) { - // cache value is evicted - // so it's callback would be called, we don't need to worry about - // cancel it. since it was treated as executed. - return; - } - cacheValue.removeCallback(request.getCallback(), request.getCtx()); - } - } - - public void cancelScanRequest(ByteString topic, CancelScanRequest request) { - enqueueWithoutFailureByTopic(topic, new CancelScanRequestOp(request)); - } - - protected class ScanResponse implements CacheRequest { - CacheKey cacheKey; - Message message; - - public ScanResponse(CacheKey cacheKey, Message message) { - this.cacheKey = cacheKey; - this.message = message; - } - - @Override - public void performRequest() { - addMessageToCache(cacheKey, message, MathUtils.now()); - } - - } - - protected class DeliveredUntil implements CacheRequest { - ByteString topic; - Long seqId; - - public DeliveredUntil(ByteString topic, Long seqId) { - this.topic = topic; - this.seqId = seqId; - } - - @Override - public void performRequest() { - SortedSet<Long> orderedSeqIds = orderedIndexOnSeqId.get(topic); - if (orderedSeqIds == null) { - return; - } - - // focus on the set of messages with seq-ids <= the one that - // has been delivered until - SortedSet<Long> headSet = orderedSeqIds.headSet(seqId + 1); - - for (Iterator<Long> iter = headSet.iterator(); iter.hasNext();) { - Long seqId = iter.next(); - CacheKey cacheKey = new CacheKey(topic, seqId); - - logger.debug("Removing {} from cache because every subscriber has moved past", - cacheKey); - - removeMessageFromCache(cacheKey, readAheadExceptionInstance, // - // maintainTimeIndex= - true, - // maintainSeqIdIndex= - false); - iter.remove(); - } - - if (orderedSeqIds.isEmpty()) { - orderedIndexOnSeqId.remove(topic); - } - } - } - - protected class ScanRequestWrapper implements CacheRequest { - ScanRequest request; - - public ScanRequestWrapper(ScanRequest request) { - this.request = request; - } - - /** - * To handle a scan request, we first try to do readahead (which might - * cause a range read to be issued to the underlying persistence - * manager). The readahead will put a stub in the cache, if the message - * is not already present in the cache. The scan callback that is part - * of the scan request is added to this stub, and will be called later - * when the message arrives as a result of the range scan issued to the - * underlying persistence manager. - */ - - @Override - public void performRequest() { - - RangeScanRequest readAheadRequest = doReadAhead(request); - - // Read ahead must have installed at least a stub for us, so this - // can't be null - CacheKey cacheKey = new CacheKey(request.getTopic(), request.getStartSeqId()); - CacheValue cacheValue = cache.get(cacheKey); - if (null == cacheValue) { - logger.error("Cache key {} is removed after installing stub when scanning.", cacheKey); - // reissue the request - scanSingleMessage(request); - return; - } - - synchronized (cacheValue) { - // Add our callback to the stub. If the cache value was already a - // concrete message, the callback will be called right away - cacheValue.addCallback(request.getCallback(), request.getCtx()); - } - - if (readAheadRequest != null) { - realPersistenceManager.scanMessages(readAheadRequest); - } - } - } - - @Override - public void registerJMX(HedwigMBeanInfo parent) { - try { - jmxCacheBean = new ReadAheadCacheBean(this); - HedwigMBeanRegistry.getInstance().register(jmxCacheBean, parent); - } catch (Exception e) { - logger.warn("Failed to register readahead cache with JMX", e); - jmxCacheBean = null; - } - } - - @Override - public void unregisterJMX() { - try { - if (jmxCacheBean != null) { - HedwigMBeanRegistry.getInstance().unregister(jmxCacheBean); - } - } catch (Exception e) { - logger.warn("Failed to unregister readahead cache with JMX", e); - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java deleted file mode 100644 index 1f43095..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.server.persistence; - -import org.apache.hedwig.server.jmx.HedwigMBeanInfo; - -/** - * Read Ahead Cache Bean - */ -public class ReadAheadCacheBean implements ReadAheadCacheMXBean, - HedwigMBeanInfo { - - ReadAheadCache cache; - public ReadAheadCacheBean(ReadAheadCache cache) { - this.cache = cache; - } - - @Override - public String getName() { - return "ReadAheadCache"; - } - - @Override - public boolean isHidden() { - return false; - } - - @Override - public long getMaxCacheSize() { - return cache.cfg.getMaximumCacheSize(); - } - - @Override - public long getPresentCacheSize() { - return cache.presentCacheSize.get(); - } - - @Override - public int getNumCachedEntries() { - return cache.cache.size(); - } - - @Override - public int getNumPendingCacheRequests() { - return cache.numPendingRequests.get(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java deleted file mode 100644 index eba77a0..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.server.persistence; - -/** - * Read Ahead Cache MBean - */ -public interface ReadAheadCacheMXBean { - - /** - * @return max cache size - */ - public long getMaxCacheSize(); - - /** - * @return present cache size - */ - public long getPresentCacheSize(); - - /** - * @return number of cached entries - */ - public int getNumCachedEntries(); - - /** - * @return number of pending cache requests - */ - public int getNumPendingCacheRequests(); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java deleted file mode 100644 index 42ebb93..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import org.apache.hedwig.protocol.PubSubProtocol.Message; - -public interface ScanCallback { - - enum ReasonForFinish { - NO_MORE_MESSAGES, SIZE_LIMIT_EXCEEDED, NUM_MESSAGES_LIMIT_EXCEEDED - }; - - /** - * This method is called when a message is read from the persistence layer - * as part of a scan. The message just read is handed to this listener which - * can then take the desired action on it. The return value from the method - * indicates whether the scan should continue or not. - * - * @param ctx - * The context for the callback - * @param message - * The message just scanned from the log - * @return true if the scan should continue, false otherwise - */ - public void messageScanned(Object ctx, Message message); - - /** - * This method is called when the scan finishes - * - * - * @param ctx - * @param reason - */ - - public abstract void scanFinished(Object ctx, ReasonForFinish reason); - - /** - * This method is called when the operation failed due to some reason. The - * reason for failure is passed in. - * - * @param ctx - * The context for the callback - * @param exception - * The reason for the failure of the scan - */ - public abstract void scanFailed(Object ctx, Exception exception); - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java deleted file mode 100644 index a39a197..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -public class ScanCallbackWithContext { - ScanCallback scanCallback; - Object ctx; - - public ScanCallbackWithContext(ScanCallback callback, Object ctx) { - this.scanCallback = callback; - this.ctx = ctx; - } - - public ScanCallback getScanCallback() { - return scanCallback; - } - - public Object getCtx() { - return ctx; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof ScanCallbackWithContext)) { - return false; - } - ScanCallbackWithContext otherCb = - (ScanCallbackWithContext) other; - // Ensure that it was same callback & same ctx - return scanCallback == otherCb.scanCallback && - ctx == otherCb.ctx; - } - - @Override - public int hashCode() { - return scanCallback.hashCode(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java deleted file mode 100644 index c985840..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.persistence; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.protocol.PubSubProtocol.Message; - -/** - * Encapsulates a request for reading a single message. The message on the given - * topic <b>at</b> the given seqId is scanned. A call-back {@link ScanCallback} - * is provided. When the message is scanned, the - * {@link ScanCallback#messageScanned(Object, Message)} method is called. Since - * there is only 1 record to be scanned the - * {@link ScanCallback#operationFinished(Object)} method may not be called since - * its redundant. - * {@link ScanCallback#scanFailed(Object, org.apache.hedwig.exceptions.PubSubException)} - * method is called in case of error. - * - */ -public class ScanRequest { - ByteString topic; - long startSeqId; - ScanCallback callback; - Object ctx; - - public ScanRequest(ByteString topic, long startSeqId, ScanCallback callback, Object ctx) { - this.topic = topic; - this.startSeqId = startSeqId; - this.callback = callback; - this.ctx = ctx; - } - - public ByteString getTopic() { - return topic; - } - - public long getStartSeqId() { - return startSeqId; - } - - public ScanCallback getCallback() { - return callback; - } - - public Object getCtx() { - return ctx; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java deleted file mode 100644 index 10f0889..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.proxy; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; - -import org.jboss.netty.channel.Channel; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.TopicBusyException; -import org.apache.hedwig.server.handlers.ChannelDisconnectListener; -import org.apache.hedwig.util.Callback; - -public class ChannelTracker implements ChannelDisconnectListener { - HashMap<TopicSubscriber, Channel> topicSub2Channel = new HashMap<TopicSubscriber, Channel>(); - HashMap<Channel, List<TopicSubscriber>> channel2TopicSubs = new HashMap<Channel, List<TopicSubscriber>>(); - Subscriber subscriber; - - public ChannelTracker(Subscriber subscriber) { - this.subscriber = subscriber; - } - - static Callback<Void> noOpCallback = new Callback<Void>() { - public void operationFailed(Object ctx, PubSubException exception) { - }; - - public void operationFinished(Object ctx, Void resultOfOperation) { - }; - }; - - public synchronized void channelDisconnected(Channel channel) { - List<TopicSubscriber> topicSubs = channel2TopicSubs.remove(channel); - - if (topicSubs == null) { - return; - } - - for (TopicSubscriber topicSub : topicSubs) { - topicSub2Channel.remove(topicSub); - subscriber.asyncCloseSubscription(topicSub.getTopic(), topicSub.getSubscriberId(), noOpCallback, null); - } - } - - public synchronized void subscribeSucceeded(TopicSubscriber topicSubscriber, Channel channel) - throws TopicBusyException { - - if (!channel.isConnected()) { - // channel got disconnected while we were processing the - // subscribe request, nothing much we can do in this case - return; - } - - if (topicSub2Channel.containsKey(topicSubscriber)) { - TopicBusyException pse = new PubSubException.TopicBusyException( - "subscription for this topic, subscriberId is already being served on a different channel"); - throw pse; - } - - topicSub2Channel.put(topicSubscriber, channel); - - List<TopicSubscriber> topicSubs = channel2TopicSubs.get(channel); - - if (topicSubs == null) { - topicSubs = new LinkedList<TopicSubscriber>(); - channel2TopicSubs.put(channel, topicSubs); - } - topicSubs.add(topicSubscriber); - - } - - public void aboutToCloseSubscription(ByteString topic, ByteString subscriberId) { - removeSubscriber(topic, subscriberId); - } - - public void aboutToUnsubscribe(ByteString topic, ByteString subscriberId) { - removeSubscriber(topic, subscriberId); - } - - private synchronized void removeSubscriber(ByteString topic, ByteString subscriberId) { - TopicSubscriber topicSub = new TopicSubscriber(topic, subscriberId); - - Channel channel = topicSub2Channel.remove(topicSub); - - if (channel != null) { - List<TopicSubscriber> topicSubs = channel2TopicSubs.get(channel); - if (topicSubs != null) { - topicSubs.remove(topicSub); - } - } - } - - public synchronized void checkChannelMatches(ByteString topic, ByteString subscriberId, Channel channel) - throws PubSubException { - Channel subscribedChannel = getChannel(topic, subscriberId); - - if (subscribedChannel == null) { - throw new PubSubException.ClientNotSubscribedException( - "Can't start delivery since client is not subscribed"); - } - - if (subscribedChannel != channel) { - throw new PubSubException.TopicBusyException( - "Can't start delivery since client is subscribed on a different channel"); - } - - } - - public synchronized Channel getChannel(ByteString topic, ByteString subscriberId) { - return topicSub2Channel.get(new TopicSubscriber(topic, subscriberId)); - } - -}