http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java deleted file mode 100644 index 0480b22..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java +++ /dev/null @@ -1,978 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.delivery; - -import static org.apache.hedwig.util.VarArgs.va; - -import java.util.Comparator; -import java.util.HashSet; -import java.util.Queue; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; - -import org.apache.bookkeeper.util.MathUtils; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.filter.ServerMessageFilter; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.protoextensions.PubSubResponseUtils; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.common.UnexpectedError; -import org.apache.hedwig.server.handlers.SubscriptionChannelManager.SubChannelDisconnectedListener; -import org.apache.hedwig.server.netty.ServerStats; -import org.apache.hedwig.server.persistence.CancelScanRequest; -import org.apache.hedwig.server.persistence.Factory; -import org.apache.hedwig.server.persistence.MapMethods; -import org.apache.hedwig.server.persistence.PersistenceManager; -import org.apache.hedwig.server.persistence.ReadAheadCache; -import org.apache.hedwig.server.persistence.ScanCallback; -import org.apache.hedwig.server.persistence.ScanRequest; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.util.Callback; - -public class FIFODeliveryManager implements DeliveryManager, SubChannelDisconnectedListener { - - protected static final Logger logger = LoggerFactory.getLogger(FIFODeliveryManager.class); - - private static Callback<Void> NOP_CALLBACK = new Callback<Void>() { - @Override - public void operationFinished(Object ctx, Void result) { - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - } - }; - - protected interface DeliveryManagerRequest { - public void performRequest(); - } - - /** - * Stores a mapping from topic to the delivery pointers on the topic. The - * delivery pointers are stored in a sorted map from seq-id to the set of - * subscribers at that seq-id - */ - ConcurrentMap<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>> perTopicDeliveryPtrs; - - /** - * Mapping from delivery end point to the subscriber state that we are - * serving at that end point. This prevents us e.g., from serving two - * subscriptions to the same endpoint - */ - ConcurrentMap<TopicSubscriber, ActiveSubscriberState> subscriberStates; - - private final ReadAheadCache cache; - private final PersistenceManager persistenceMgr; - private TopicManager tm; - private ServerConfiguration cfg; - - private final int numDeliveryWorkers; - private final DeliveryWorker[] deliveryWorkers; - - private class DeliveryWorker implements Runnable { - - BlockingQueue<DeliveryManagerRequest> requestQueue = - new LinkedBlockingQueue<DeliveryManagerRequest>();; - - /** - * The queue of all subscriptions that are facing a transient error either - * in scanning from the persistence manager, or in sending to the consumer - */ - Queue<ActiveSubscriberState> retryQueue = - new PriorityBlockingQueue<ActiveSubscriberState>(32, new Comparator<ActiveSubscriberState>() { - @Override - public int compare(ActiveSubscriberState as1, ActiveSubscriberState as2) { - long s = as1.lastScanErrorTime - as2.lastScanErrorTime; - return s > 0 ? 1 : (s < 0 ? -1 : 0); - } - }); - - // Boolean indicating if this thread should continue running. This is used - // when we want to stop the thread during a PubSubServer shutdown. - protected volatile boolean keepRunning = true; - private final Thread workerThread; - private final int idx; - - private final Object suspensionLock = new Object(); - private boolean suspended = false; - - DeliveryWorker(int index) { - this.idx = index; - workerThread = new Thread(this, "DeliveryManagerThread-" + index); - } - - void start() { - workerThread.start(); - } - - /** - * Stop method which will enqueue a ShutdownDeliveryManagerRequest. - */ - void stop() { - enqueueWithoutFailure(new ShutdownDeliveryManagerRequest()); - } - - /** - * Stop FIFO delivery worker from processing requests. (for testing) - */ - void suspendProcessing() { - synchronized(suspensionLock) { - suspended = true; - } - } - - /** - * Resume FIFO delivery worker. (for testing) - */ - void resumeProcessing() { - synchronized(suspensionLock) { - suspended = false; - suspensionLock.notify(); - } - } - - @Override - public void run() { - while (keepRunning) { - DeliveryManagerRequest request = null; - - try { - // We use a timeout of 1 second, so that we can wake up once in - // a while to check if there is something in the retry queue. - request = requestQueue.poll(1, TimeUnit.SECONDS); - synchronized(suspensionLock) { - while (suspended) { - suspensionLock.wait(); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - // First retry any subscriptions that had failed and need a retry - retryErroredSubscribers(); - - if (request == null) { - continue; - } - - request.performRequest(); - - } - } - - protected void enqueueWithoutFailure(DeliveryManagerRequest request) { - if (!requestQueue.offer(request)) { - throw new UnexpectedError("Could not enqueue object: " + request - + " to request queue for delivery worker ." + idx); - } - } - - public void retryErroredSubscriberAfterDelay(ActiveSubscriberState subscriber) { - subscriber.setLastScanErrorTime(MathUtils.now()); - - if (!retryQueue.offer(subscriber)) { - throw new UnexpectedError("Could not enqueue to retry queue for delivery worker " + idx); - } - } - - public void clearRetryDelayForSubscriber(ActiveSubscriberState subscriber) { - subscriber.clearLastScanErrorTime(); - if (!retryQueue.offer(subscriber)) { - throw new UnexpectedError("Could not enqueue to delivery manager retry queue"); - } - // no request in request queue now - // issue a empty delivery request to not waiting for polling requests queue - if (requestQueue.isEmpty()) { - enqueueWithoutFailure(new DeliveryManagerRequest() { - @Override - public void performRequest() { - // do nothing - } - }); - } - } - - protected void retryErroredSubscribers() { - long lastInterestingFailureTime = MathUtils.now() - cfg.getScanBackoffPeriodMs(); - ActiveSubscriberState subscriber; - - while ((subscriber = retryQueue.peek()) != null) { - if (subscriber.getLastScanErrorTime() > lastInterestingFailureTime) { - // Not enough time has elapsed yet, will retry later - // Since the queue is fifo, no need to check later items - return; - } - - // retry now - subscriber.deliverNextMessage(); - retryQueue.poll(); - } - } - - protected class ShutdownDeliveryManagerRequest implements DeliveryManagerRequest { - // This is a simple type of Request we will enqueue when the - // PubSubServer is shut down and we want to stop the DeliveryManager - // thread. - @Override - public void performRequest() { - keepRunning = false; - } - } - - } - - - - public FIFODeliveryManager(TopicManager tm, PersistenceManager persistenceMgr, - ServerConfiguration cfg) { - this.tm = tm; - this.persistenceMgr = persistenceMgr; - if (persistenceMgr instanceof ReadAheadCache) { - this.cache = (ReadAheadCache) persistenceMgr; - } else { - this.cache = null; - } - perTopicDeliveryPtrs = - new ConcurrentHashMap<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>>(); - subscriberStates = - new ConcurrentHashMap<TopicSubscriber, ActiveSubscriberState>(); - this.cfg = cfg; - // initialize the delivery workers - this.numDeliveryWorkers = cfg.getNumDeliveryThreads(); - this.deliveryWorkers = new DeliveryWorker[numDeliveryWorkers]; - for (int i=0; i<numDeliveryWorkers; i++) { - deliveryWorkers[i] = new DeliveryWorker(i); - } - } - - @Override - public void start() { - for (int i=0; i<numDeliveryWorkers; i++) { - deliveryWorkers[i].start(); - } - } - - /** - * Stop FIFO delivery manager from processing requests. (for testing) - */ - @VisibleForTesting - public void suspendProcessing() { - for (int i=0; i<numDeliveryWorkers; i++) { - deliveryWorkers[i].suspendProcessing(); - } - } - - /** - * Resume FIFO delivery manager. (for testing) - */ - @VisibleForTesting - public void resumeProcessing() { - for (int i=0; i<numDeliveryWorkers; i++) { - deliveryWorkers[i].resumeProcessing(); - } - } - - /** - * Stop the FIFO delivery manager. - */ - @Override - public void stop() { - for (int i=0; i<numDeliveryWorkers; i++) { - deliveryWorkers[i].stop(); - } - } - - private DeliveryWorker getDeliveryWorker(ByteString topic) { - return deliveryWorkers[MathUtils.signSafeMod(topic.hashCode(), numDeliveryWorkers)]; - } - - /** - * ===================================================================== Our - * usual enqueue function, stop if error because of unbounded queue, should - * never happen - * - */ - protected void enqueueWithoutFailure(ByteString topic, DeliveryManagerRequest request) { - getDeliveryWorker(topic).enqueueWithoutFailure(request); - } - - /** - * Tells the delivery manager to start sending out messages for a particular - * subscription - * - * @param topic - * @param subscriberId - * @param seqIdToStartFrom - * Message sequence-id from where delivery should be started - * @param endPoint - * The delivery end point to which send messages to - * @param filter - * Only messages passing this filter should be sent to this - * subscriber - * @param callback - * Callback instance - * @param ctx - * Callback context - */ - @Override - public void startServingSubscription(ByteString topic, ByteString subscriberId, - SubscriptionPreferences preferences, - MessageSeqId seqIdToStartFrom, - DeliveryEndPoint endPoint, ServerMessageFilter filter, - Callback<Void> callback, Object ctx) { - ActiveSubscriberState subscriber = - new ActiveSubscriberState(topic, subscriberId, - preferences, - seqIdToStartFrom.getLocalComponent() - 1, - endPoint, filter, callback, ctx); - - enqueueWithoutFailure(topic, subscriber); - } - - public void stopServingSubscriber(ByteString topic, ByteString subscriberId, - SubscriptionEvent event, - Callback<Void> cb, Object ctx) { - enqueueWithoutFailure(topic, new StopServingSubscriber(topic, subscriberId, event, cb, ctx)); - } - - /** - * Instructs the delivery manager to backoff on the given subscriber and - * retry sending after some time - * - * @param subscriber - */ - public void retryErroredSubscriberAfterDelay(ActiveSubscriberState subscriber) { - getDeliveryWorker(subscriber.getTopic()).retryErroredSubscriberAfterDelay(subscriber); - } - - public void clearRetryDelayForSubscriber(ActiveSubscriberState subscriber) { - getDeliveryWorker(subscriber.getTopic()).clearRetryDelayForSubscriber(subscriber); - } - - // TODO: for now, I don't move messageConsumed request to delivery manager thread, - // which is supposed to be fixed in {@link https://issues.apache.org/jira/browse/BOOKKEEPER-503} - @Override - public void messageConsumed(ByteString topic, ByteString subscriberId, - MessageSeqId consumedSeqId) { - ActiveSubscriberState subState = - subscriberStates.get(new TopicSubscriber(topic, subscriberId)); - if (null == subState) { - return; - } - subState.messageConsumed(consumedSeqId.getLocalComponent()); - } - - /** - * Instructs the delivery manager to move the delivery pointer for a given - * subscriber - * - * @param subscriber - * @param prevSeqId - * @param newSeqId - */ - public void moveDeliveryPtrForward(ActiveSubscriberState subscriber, long prevSeqId, long newSeqId) { - enqueueWithoutFailure(subscriber.getTopic(), - new DeliveryPtrMove(subscriber, prevSeqId, newSeqId)); - } - - protected void removeDeliveryPtr(ActiveSubscriberState subscriber, Long seqId, boolean isAbsenceOk, - boolean pruneTopic) { - - assert seqId != null; - - // remove this subscriber from the delivery pointers data structure - ByteString topic = subscriber.getTopic(); - SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = perTopicDeliveryPtrs.get(topic); - - if (deliveryPtrs == null && !isAbsenceOk) { - throw new UnexpectedError("No delivery pointers found while disconnecting " + "channel for topic:" + topic); - } - - if(null == deliveryPtrs) { - return; - } - - if (!MapMethods.removeFromMultiMap(deliveryPtrs, seqId, subscriber) && !isAbsenceOk) { - - throw new UnexpectedError("Could not find subscriber:" + subscriber + " at the expected delivery pointer"); - } - - if (pruneTopic && deliveryPtrs.isEmpty()) { - perTopicDeliveryPtrs.remove(topic); - } - - } - - protected long getMinimumSeqId(ByteString topic) { - SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = perTopicDeliveryPtrs.get(topic); - - if (deliveryPtrs == null || deliveryPtrs.isEmpty()) { - return Long.MAX_VALUE - 1; - } - return deliveryPtrs.firstKey(); - } - - protected void addDeliveryPtr(ActiveSubscriberState subscriber, Long seqId) { - - // If this topic doesn't exist in the per-topic delivery pointers table, - // create an entry for it - SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = MapMethods.getAfterInsertingIfAbsent( - perTopicDeliveryPtrs, subscriber.getTopic(), TreeMapLongToSetSubscriberFactory.instance); - - MapMethods.addToMultiMap(deliveryPtrs, seqId, subscriber, HashMapSubscriberFactory.instance); - } - - public class ActiveSubscriberState - implements ScanCallback, DeliveryCallback, DeliveryManagerRequest, CancelScanRequest { - - static final int UNLIMITED = 0; - - ByteString topic; - ByteString subscriberId; - long lastLocalSeqIdDelivered; - boolean connected = true; - ReentrantReadWriteLock connectedLock = new ReentrantReadWriteLock(); - DeliveryEndPoint deliveryEndPoint; - long lastScanErrorTime = -1; - long localSeqIdDeliveringNow; - long lastSeqIdCommunicatedExternally; - long lastSeqIdConsumedUtil; - boolean isThrottled = false; - final int messageWindowSize; - ServerMessageFilter filter; - Callback<Void> cb; - Object ctx; - - // track the outstanding scan request - // so we could cancel it - ScanRequest outstandingScanRequest; - - final static int SEQ_ID_SLACK = 10; - - public ActiveSubscriberState(ByteString topic, ByteString subscriberId, - SubscriptionPreferences preferences, - long lastLocalSeqIdDelivered, - DeliveryEndPoint deliveryEndPoint, - ServerMessageFilter filter, - Callback<Void> cb, Object ctx) { - this.topic = topic; - this.subscriberId = subscriberId; - this.lastLocalSeqIdDelivered = lastLocalSeqIdDelivered; - this.lastSeqIdConsumedUtil = lastLocalSeqIdDelivered; - this.deliveryEndPoint = deliveryEndPoint; - this.filter = filter; - if (preferences.hasMessageWindowSize()) { - messageWindowSize = preferences.getMessageWindowSize(); - } else { - if (FIFODeliveryManager.this.cfg.getDefaultMessageWindowSize() > 0) { - messageWindowSize = - FIFODeliveryManager.this.cfg.getDefaultMessageWindowSize(); - } else { - messageWindowSize = UNLIMITED; - } - } - this.cb = cb; - this.ctx = ctx; - } - - public void setNotConnected(SubscriptionEvent event) { - this.connectedLock.writeLock().lock(); - try { - // have closed it. - if (!connected) { - return; - } - this.connected = false; - // put itself in ReadAhead queue to cancel outstanding scan request - // if outstanding scan request callback before cancel op executed, - // nothing it would cancel. - if (null != cache && null != outstandingScanRequest) { - cache.cancelScanRequest(topic, this); - } - } finally { - this.connectedLock.writeLock().unlock(); - } - - if (null != event && - (SubscriptionEvent.TOPIC_MOVED == event || - SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED == event)) { - // we should not close the channel now after enabling multiplexing - PubSubResponse response = PubSubResponseUtils.getResponseForSubscriptionEvent( - topic, subscriberId, event - ); - deliveryEndPoint.send(response, new DeliveryCallback() { - @Override - public void sendingFinished() { - // do nothing now - } - @Override - public void transientErrorOnSend() { - // do nothing now - } - @Override - public void permanentErrorOnSend() { - // if channel is broken, close the channel - deliveryEndPoint.close(); - } - }); - } - // uninitialize filter - this.filter.uninitialize(); - } - - public ByteString getTopic() { - return topic; - } - - public synchronized long getLastScanErrorTime() { - return lastScanErrorTime; - } - - public synchronized void setLastScanErrorTime(long lastScanErrorTime) { - this.lastScanErrorTime = lastScanErrorTime; - } - - /** - * Clear the last scan error time so it could be retry immediately. - */ - protected synchronized void clearLastScanErrorTime() { - this.lastScanErrorTime = -1; - } - - protected boolean isConnected() { - connectedLock.readLock().lock(); - try { - return connected; - } finally { - connectedLock.readLock().unlock(); - } - } - - protected synchronized void messageConsumed(long newSeqIdConsumed) { - if (newSeqIdConsumed <= lastSeqIdConsumedUtil) { - return; - } - if (logger.isDebugEnabled()) { - logger.debug("Subscriber ({}) moved consumed ptr from {} to {}.", - va(this, lastSeqIdConsumedUtil, newSeqIdConsumed)); - } - lastSeqIdConsumedUtil = newSeqIdConsumed; - // after updated seq id check whether it still exceed msg limitation - if (msgLimitExceeded()) { - return; - } - if (isThrottled) { - isThrottled = false; - logger.info("Try to wake up subscriber ({}) to deliver messages again : last delivered {}, last consumed {}.", - va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil)); - - enqueueWithoutFailure(topic, new DeliveryManagerRequest() { - @Override - public void performRequest() { - // enqueue - clearRetryDelayForSubscriber(ActiveSubscriberState.this); - } - }); - } - } - - protected boolean msgLimitExceeded() { - if (messageWindowSize == UNLIMITED) { - return false; - } - if (lastLocalSeqIdDelivered - lastSeqIdConsumedUtil >= messageWindowSize) { - return true; - } - return false; - } - - public void deliverNextMessage() { - connectedLock.readLock().lock(); - try { - doDeliverNextMessage(); - } finally { - connectedLock.readLock().unlock(); - } - } - - private void doDeliverNextMessage() { - if (!connected) { - return; - } - - synchronized (this) { - // check whether we have delivered enough messages without receiving their consumes - if (msgLimitExceeded()) { - logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed {}.", - va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil)); - isThrottled = true; - // do nothing, since the delivery process would be throttled. - // After message consumed, it would be added back to retry queue. - return; - } - - localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered, 1); - - outstandingScanRequest = new ScanRequest(topic, localSeqIdDeliveringNow, - /* callback= */this, /* ctx= */null); - } - - persistenceMgr.scanSingleMessage(outstandingScanRequest); - } - - /** - * =============================================================== - * {@link CancelScanRequest} methods - * - * This method runs ins same threads with ScanCallback. When it runs, - * it checked whether it is outstanding scan request. if there is one, - * cancel it. - */ - @Override - public ScanRequest getScanRequest() { - // no race between cancel request and scan callback - // the only race is between stopServing and deliverNextMessage - // deliverNextMessage would be executed in netty callback which is in netty thread - // stopServing is run in delivery thread. if stopServing runs before deliverNextMessage - // deliverNextMessage would have chance to put a stub in ReadAheadCache - // then we don't have any chance to cancel it. - // use connectedLock to avoid such race. - return outstandingScanRequest; - } - - private boolean checkConnected() { - connectedLock.readLock().lock(); - try { - // message scanned means the outstanding request is executed - outstandingScanRequest = null; - return connected; - } finally { - connectedLock.readLock().unlock(); - } - } - - /** - * =============================================================== - * {@link ScanCallback} methods - */ - - public void messageScanned(Object ctx, Message message) { - if (!checkConnected()) { - return; - } - - // only increment topic access times when tried to deliver a message - // for those subscribers just waiting for a published for a long time - // we don't increment topic access times, so the topic would be evicted - // in future. - tm.incrementTopicAccessTimes(topic); - - if (!filter.testMessage(message)) { - // for filtered out messages, we don't deliver the message to client, so we would not - // receive its consume request which moves the <i>lastSeqIdConsumedUtil</i> pointer. - // we move the <i>lastSeqIdConsumedUtil</i> here for filtered out messages, which would - // avoid a subscriber being throttled due to the message gap introduced by filtering. - // - // it is OK to move <i>lastSeqIdConsumedUtil</i> here, since this pointer is subscriber's - // delivery state which to trottling deliver. changing <i>lastSeqIdConsumedUtil</i> would - // not affect the subscriber's consume pointer in zookeeper which is managed in subscription - // manager. - // - // And marking message consumed before calling sending finished, would avoid the subscriber - // being throttled first and released from throttled state laster. - messageConsumed(message.getMsgId().getLocalComponent()); - sendingFinished(); - return; - } - - /** - * The method below will invoke our sendingFinished() method when - * done - */ - PubSubResponse response = PubSubResponse.newBuilder() - .setProtocolVersion(ProtocolVersion.VERSION_ONE) - .setStatusCode(StatusCode.SUCCESS).setTxnId(0) - .setMessage(message).setTopic(topic) - .setSubscriberId(subscriberId).build(); - - deliveryEndPoint.send(response, // - // callback = - this); - - } - - @Override - public void scanFailed(Object ctx, Exception exception) { - if (!checkConnected()) { - return; - } - - // wait for some time and then retry - retryErroredSubscriberAfterDelay(this); - } - - @Override - public void scanFinished(Object ctx, ReasonForFinish reason) { - checkConnected(); - } - - /** - * =============================================================== - * {@link DeliveryCallback} methods - */ - @Override - public void sendingFinished() { - if (!isConnected()) { - return; - } - - synchronized (this) { - lastLocalSeqIdDelivered = localSeqIdDeliveringNow; - - if (lastLocalSeqIdDelivered > lastSeqIdCommunicatedExternally + SEQ_ID_SLACK) { - // Note: The order of the next 2 statements is important. We should - // submit a request to change our delivery pointer only *after* we - // have actually changed it. Otherwise, there is a race condition - // with removal of this channel, w.r.t, maintaining the deliveryPtrs - // tree map. - long prevId = lastSeqIdCommunicatedExternally; - lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered; - moveDeliveryPtrForward(this, prevId, lastLocalSeqIdDelivered); - } - } - // increment deliveried message - ServerStats.getInstance().incrementMessagesDelivered(); - deliverNextMessage(); - } - - public synchronized long getLastSeqIdCommunicatedExternally() { - return lastSeqIdCommunicatedExternally; - } - - - @Override - public void permanentErrorOnSend() { - // the underlying channel is broken, the channel will - // be closed in UmbrellaHandler when exception happened. - // so we don't need to close the channel again - stopServingSubscriber(topic, subscriberId, null, - NOP_CALLBACK, null); - } - - @Override - public void transientErrorOnSend() { - retryErroredSubscriberAfterDelay(this); - } - - /** - * =============================================================== - * {@link DeliveryManagerRequest} methods - */ - @Override - public void performRequest() { - // Put this subscriber in the channel to subscriber mapping - ActiveSubscriberState prevSubscriber = - subscriberStates.put(new TopicSubscriber(topic, subscriberId), this); - - // after put the active subscriber in subscriber states mapping - // trigger the callback to tell it started to deliver the message - // should let subscriber response go first before first delivered message. - cb.operationFinished(ctx, (Void)null); - - if (prevSubscriber != null) { - // we already in the delivery thread, we don't need to equeue a stop request - // just stop it now, since stop is not blocking operation. - // and also it cleans the old state of the active subscriber immediately. - SubscriptionEvent se; - if (deliveryEndPoint.equals(prevSubscriber.deliveryEndPoint)) { - logger.debug("Subscriber {} replaced a duplicated subscriber {} at same delivery point {}.", - va(this, prevSubscriber, deliveryEndPoint)); - se = null; - } else { - logger.debug("Subscriber {} from delivery point {} forcelly closed delivery point {}.", - va(this, deliveryEndPoint, prevSubscriber.deliveryEndPoint)); - se = SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED; - } - doStopServingSubscriber(prevSubscriber, se); - } - - synchronized (this) { - lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered; - addDeliveryPtr(this, lastLocalSeqIdDelivered); - } - - deliverNextMessage(); - }; - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("Topic: "); - sb.append(topic.toStringUtf8()); - sb.append("Subscriber: "); - sb.append(subscriberId.toStringUtf8()); - sb.append(", DeliveryPtr: "); - sb.append(lastLocalSeqIdDelivered); - return sb.toString(); - - } - } - - protected class StopServingSubscriber implements DeliveryManagerRequest { - TopicSubscriber ts; - SubscriptionEvent event; - final Callback<Void> cb; - final Object ctx; - - public StopServingSubscriber(ByteString topic, ByteString subscriberId, - SubscriptionEvent event, - Callback<Void> callback, Object ctx) { - this.ts = new TopicSubscriber(topic, subscriberId); - this.event = event; - this.cb = callback; - this.ctx = ctx; - } - - @Override - public void performRequest() { - ActiveSubscriberState subscriber = subscriberStates.remove(ts); - if (null != subscriber) { - doStopServingSubscriber(subscriber, event); - } - cb.operationFinished(ctx, null); - } - - } - - /** - * Stop serving a subscriber. This method should be called in a - * {@link DeliveryManagerRequest}. - * - * @param subscriber - * Active Subscriber to stop - * @param event - * Subscription Event for the stop reason - */ - private void doStopServingSubscriber(ActiveSubscriberState subscriber, SubscriptionEvent event) { - // This will automatically stop delivery, and disconnect the channel - subscriber.setNotConnected(event); - - // if the subscriber has moved on, a move request for its delivery - // pointer must be pending in the request queue. Note that the - // subscriber first changes its delivery pointer and then submits a - // request to move so this works. - removeDeliveryPtr(subscriber, subscriber.getLastSeqIdCommunicatedExternally(), // - // isAbsenceOk= - true, - // pruneTopic= - true); - } - - protected class DeliveryPtrMove implements DeliveryManagerRequest { - - ActiveSubscriberState subscriber; - Long oldSeqId; - Long newSeqId; - - public DeliveryPtrMove(ActiveSubscriberState subscriber, Long oldSeqId, Long newSeqId) { - this.subscriber = subscriber; - this.oldSeqId = oldSeqId; - this.newSeqId = newSeqId; - } - - @Override - public void performRequest() { - ByteString topic = subscriber.getTopic(); - long prevMinSeqId = getMinimumSeqId(topic); - - if (subscriber.isConnected()) { - removeDeliveryPtr(subscriber, oldSeqId, // - // isAbsenceOk= - false, - // pruneTopic= - false); - - addDeliveryPtr(subscriber, newSeqId); - } else { - removeDeliveryPtr(subscriber, oldSeqId, // - // isAbsenceOk= - true, - // pruneTopic= - true); - } - - long nowMinSeqId = getMinimumSeqId(topic); - - if (nowMinSeqId > prevMinSeqId) { - persistenceMgr.deliveredUntil(topic, nowMinSeqId); - } - } - } - - /** - * ==================================================================== - * - * Dumb factories for our map methods - */ - protected static class TreeMapLongToSetSubscriberFactory implements - Factory<SortedMap<Long, Set<ActiveSubscriberState>>> { - static TreeMapLongToSetSubscriberFactory instance = new TreeMapLongToSetSubscriberFactory(); - - @Override - public SortedMap<Long, Set<ActiveSubscriberState>> newInstance() { - return new TreeMap<Long, Set<ActiveSubscriberState>>(); - } - } - - protected static class HashMapSubscriberFactory implements Factory<Set<ActiveSubscriberState>> { - static HashMapSubscriberFactory instance = new HashMapSubscriberFactory(); - - @Override - public Set<ActiveSubscriberState> newInstance() { - return new HashSet<ActiveSubscriberState>(); - } - } - - @Override - public void onSubChannelDisconnected(TopicSubscriber topicSubscriber) { - stopServingSubscriber(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(), - null, NOP_CALLBACK, null); - } - -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java deleted file mode 100644 index 4189eb6..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.handlers; - -import org.jboss.netty.channel.Channel; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.protoextensions.PubSubResponseUtils; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.netty.ServerStats; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.HedwigSocketAddress; - -public abstract class BaseHandler implements Handler { - - protected TopicManager topicMgr; - protected ServerConfiguration cfg; - - protected BaseHandler(TopicManager tm, ServerConfiguration cfg) { - this.topicMgr = tm; - this.cfg = cfg; - } - - - public void handleRequest(final PubSubRequest request, final Channel channel) { - topicMgr.getOwner(request.getTopic(), request.getShouldClaim(), - new Callback<HedwigSocketAddress>() { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())); - ServerStats.getInstance().getOpStats(request.getType()).incrementFailedOps(); - } - - @Override - public void operationFinished(Object ctx, HedwigSocketAddress owner) { - if (!owner.equals(cfg.getServerAddr())) { - channel.write(PubSubResponseUtils.getResponseForException( - new ServerNotResponsibleForTopicException(owner.toString()), request.getTxnId())); - ServerStats.getInstance().incrementRequestsRedirect(); - return; - } - handleRequestAtOwner(request, channel); - } - }, null); - } - - public abstract void handleRequestAtOwner(PubSubRequest request, Channel channel); - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java deleted file mode 100644 index 458d301..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.handlers; - -import org.jboss.netty.channel.Channel; - -public interface ChannelDisconnectListener { - - /** - * Act on a particular channel being disconnected - * @param channel - */ - public void channelDisconnected(Channel channel); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java deleted file mode 100644 index a6ccb7e..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.handlers; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFutureListener; - -import com.google.protobuf.ByteString; - -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; -import org.apache.hedwig.protoextensions.PubSubResponseUtils; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.delivery.DeliveryManager; -import org.apache.hedwig.server.netty.ServerStats; -import org.apache.hedwig.server.netty.ServerStats.OpStats; -import org.apache.hedwig.server.netty.UmbrellaHandler; -import org.apache.hedwig.server.subscriptions.SubscriptionManager; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.util.Callback; - -public class CloseSubscriptionHandler extends BaseHandler { - SubscriptionManager subMgr; - DeliveryManager deliveryMgr; - SubscriptionChannelManager subChannelMgr; - // op stats - final OpStats closesubStats; - - public CloseSubscriptionHandler(ServerConfiguration cfg, TopicManager tm, - SubscriptionManager subMgr, - DeliveryManager deliveryMgr, - SubscriptionChannelManager subChannelMgr) { - super(tm, cfg); - this.subMgr = subMgr; - this.deliveryMgr = deliveryMgr; - this.subChannelMgr = subChannelMgr; - closesubStats = ServerStats.getInstance().getOpStats(OperationType.CLOSESUBSCRIPTION); - } - - @Override - public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) { - if (!request.hasCloseSubscriptionRequest()) { - UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(), - "Missing closesubscription request data"); - closesubStats.incrementFailedOps(); - return; - } - - final CloseSubscriptionRequest closesubRequest = - request.getCloseSubscriptionRequest(); - final ByteString topic = request.getTopic(); - final ByteString subscriberId = closesubRequest.getSubscriberId(); - - final long requestTime = System.currentTimeMillis(); - - subMgr.closeSubscription(topic, subscriberId, new Callback<Void>() { - @Override - public void operationFinished(Object ctx, Void result) { - // we should not close the channel in delivery manager - // since client waits the response for closeSubscription request - // client side would close the channel - deliveryMgr.stopServingSubscriber(topic, subscriberId, null, - new Callback<Void>() { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())); - closesubStats.incrementFailedOps(); - } - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - // remove the topic subscription from subscription channels - subChannelMgr.remove(new TopicSubscriber(topic, subscriberId), - channel); - channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId())); - closesubStats.updateLatency(System.currentTimeMillis() - requestTime); - } - }, null); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())); - closesubStats.incrementFailedOps(); - } - }, null); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java deleted file mode 100644 index 5042a37..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.handlers; - -import org.jboss.netty.channel.Channel; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.netty.ServerStats; -import org.apache.hedwig.server.netty.UmbrellaHandler; -import org.apache.hedwig.server.netty.ServerStats.OpStats; -import org.apache.hedwig.server.subscriptions.SubscriptionManager; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.util.Callback; - -public class ConsumeHandler extends BaseHandler { - - SubscriptionManager sm; - Callback<Void> noopCallback = new NoopCallback<Void>(); - final OpStats consumeStats = ServerStats.getInstance().getOpStats(OperationType.CONSUME); - - class NoopCallback<T> implements Callback<T> { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - consumeStats.incrementFailedOps(); - } - - public void operationFinished(Object ctx, T resultOfOperation) { - // we don't collect consume process time - consumeStats.updateLatency(0); - }; - } - - @Override - public void handleRequestAtOwner(PubSubRequest request, Channel channel) { - if (!request.hasConsumeRequest()) { - UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(), - "Missing consume request data"); - consumeStats.incrementFailedOps(); - return; - } - - ConsumeRequest consumeRequest = request.getConsumeRequest(); - - sm.setConsumeSeqIdForSubscriber(request.getTopic(), consumeRequest.getSubscriberId(), - consumeRequest.getMsgId(), noopCallback, null); - - } - - public ConsumeHandler(TopicManager tm, SubscriptionManager sm, ServerConfiguration cfg) { - super(tm, cfg); - this.sm = sm; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/Handler.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/Handler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/Handler.java deleted file mode 100644 index c391f5c..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/Handler.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.handlers; - -import org.jboss.netty.channel.Channel; - -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; - -public interface Handler { - - /** - * Handle a request synchronously or asynchronously. After handling the - * request, the appropriate response should be written on the given channel - * - * @param request - * The request to handle - * - * @param channel - * The channel on which to write the response - */ - public void handleRequest(final PubSubRequest request, final Channel channel); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java deleted file mode 100644 index e0f1487..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.server.handlers; - -import org.apache.hedwig.server.handlers.SubscriptionChannelManager; -import org.apache.hedwig.server.jmx.HedwigMBeanInfo; - -public class NettyHandlerBean implements NettyHandlerMXBean, HedwigMBeanInfo { - - SubscriptionChannelManager subChannelMgr; - - public NettyHandlerBean(SubscriptionChannelManager subChannelMgr) { - this.subChannelMgr = subChannelMgr; - } - - @Override - public String getName() { - return "NettyHandlers"; - } - - @Override - public boolean isHidden() { - return false; - } - - @Override - public int getNumSubscriptionChannels() { - return subChannelMgr.getNumSubscriptionChannels(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java deleted file mode 100644 index ab8af29..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.server.handlers; - -/** - * Netty Handler MBean - */ -public interface NettyHandlerMXBean { - - /** - * @return number of subscription channels - */ - public int getNumSubscriptionChannels(); - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java deleted file mode 100644 index 587f904..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.handlers; - -import org.apache.hedwig.protocol.PubSubProtocol; -import org.jboss.netty.channel.Channel; -import org.apache.bookkeeper.util.MathUtils; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.protoextensions.PubSubResponseUtils; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.netty.ServerStats; -import org.apache.hedwig.server.netty.ServerStats.OpStats; -import org.apache.hedwig.server.netty.UmbrellaHandler; -import org.apache.hedwig.server.persistence.PersistRequest; -import org.apache.hedwig.server.persistence.PersistenceManager; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.util.Callback; - -public class PublishHandler extends BaseHandler { - - private PersistenceManager persistenceMgr; - private final OpStats pubStats; - - public PublishHandler(TopicManager topicMgr, PersistenceManager persistenceMgr, ServerConfiguration cfg) { - super(topicMgr, cfg); - this.persistenceMgr = persistenceMgr; - this.pubStats = ServerStats.getInstance().getOpStats(OperationType.PUBLISH); - } - - @Override - public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) { - if (!request.hasPublishRequest()) { - UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(), - "Missing publish request data"); - pubStats.incrementFailedOps(); - return; - } - - Message msgToSerialize = Message.newBuilder(request.getPublishRequest().getMsg()).setSrcRegion( - cfg.getMyRegionByteString()).build(); - - final long requestTime = MathUtils.now(); - PersistRequest persistRequest = new PersistRequest(request.getTopic(), msgToSerialize, - new Callback<PubSubProtocol.MessageSeqId>() { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())); - pubStats.incrementFailedOps(); - } - - @Override - public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) { - channel.write(getSuccessResponse(request.getTxnId(), resultOfOperation)); - pubStats.updateLatency(MathUtils.now() - requestTime); - } - }, null); - - persistenceMgr.persistMessage(persistRequest); - } - - private static PubSubProtocol.PubSubResponse getSuccessResponse(long txnId, PubSubProtocol.MessageSeqId publishedMessageSeqId) { - if (null == publishedMessageSeqId) { - return PubSubResponseUtils.getSuccessResponse(txnId); - } - PubSubProtocol.PublishResponse publishResponse = PubSubProtocol.PublishResponse.newBuilder().setPublishedMsgId(publishedMessageSeqId).build(); - PubSubProtocol.ResponseBody responseBody = PubSubProtocol.ResponseBody.newBuilder().setPublishResponse(publishResponse).build(); - return PubSubProtocol.PubSubResponse.newBuilder(). - setProtocolVersion(PubSubResponseUtils.serverVersion). - setStatusCode(PubSubProtocol.StatusCode.SUCCESS).setTxnId(txnId). - setResponseBody(responseBody).build(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java deleted file mode 100644 index 6df70a3..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.handlers; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFutureListener; - -import com.google.protobuf.ByteString; - -import org.apache.bookkeeper.util.MathUtils; -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException; -import org.apache.hedwig.filter.PipelineFilter; -import org.apache.hedwig.filter.ServerMessageFilter; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.protoextensions.PubSubResponseUtils; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.delivery.ChannelEndPoint; -import org.apache.hedwig.server.delivery.DeliveryManager; -import org.apache.hedwig.server.netty.ServerStats; -import org.apache.hedwig.server.netty.ServerStats.OpStats; -import org.apache.hedwig.server.netty.UmbrellaHandler; -import org.apache.hedwig.server.persistence.PersistenceManager; -import org.apache.hedwig.server.subscriptions.SubscriptionManager; -import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.util.Callback; - -public class SubscribeHandler extends BaseHandler { - private static final Logger logger = LoggerFactory.getLogger(SubscribeHandler.class); - - private final DeliveryManager deliveryMgr; - private final PersistenceManager persistenceMgr; - private final SubscriptionManager subMgr; - private final SubscriptionChannelManager subChannelMgr; - - // op stats - private final OpStats subStats; - - public SubscribeHandler(ServerConfiguration cfg, TopicManager topicMgr, - DeliveryManager deliveryManager, - PersistenceManager persistenceMgr, - SubscriptionManager subMgr, - SubscriptionChannelManager subChannelMgr) { - super(topicMgr, cfg); - this.deliveryMgr = deliveryManager; - this.persistenceMgr = persistenceMgr; - this.subMgr = subMgr; - this.subChannelMgr = subChannelMgr; - subStats = ServerStats.getInstance().getOpStats(OperationType.SUBSCRIBE); - } - - @Override - public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) { - - if (!request.hasSubscribeRequest()) { - UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(), - "Missing subscribe request data"); - subStats.incrementFailedOps(); - return; - } - - final ByteString topic = request.getTopic(); - - MessageSeqId seqId; - try { - seqId = persistenceMgr.getCurrentSeqIdForTopic(topic); - } catch (ServerNotResponsibleForTopicException e) { - channel.write(PubSubResponseUtils.getResponseForException(e, request.getTxnId())).addListener( - ChannelFutureListener.CLOSE); - logger.error("Error getting current seq id for topic " + topic.toStringUtf8() - + " when processing subscribe request (txnid:" + request.getTxnId() + ") :", e); - subStats.incrementFailedOps(); - ServerStats.getInstance().incrementRequestsRedirect(); - return; - } - - final SubscribeRequest subRequest = request.getSubscribeRequest(); - final ByteString subscriberId = subRequest.getSubscriberId(); - - MessageSeqId lastSeqIdPublished = MessageSeqId.newBuilder(seqId).setLocalComponent(seqId.getLocalComponent()).build(); - - final long requestTime = MathUtils.now(); - subMgr.serveSubscribeRequest(topic, subRequest, lastSeqIdPublished, new Callback<SubscriptionData>() { - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())).addListener( - ChannelFutureListener.CLOSE); - logger.error("Error serving subscribe request (" + request.getTxnId() + ") for (topic: " - + topic.toStringUtf8() + " , subscriber: " + subscriberId.toStringUtf8() + ")", exception); - subStats.incrementFailedOps(); - } - - @Override - public void operationFinished(Object ctx, final SubscriptionData subData) { - - TopicSubscriber topicSub = new TopicSubscriber(topic, subscriberId); - synchronized (channel) { - if (!channel.isConnected()) { - // channel got disconnected while we were processing the - // subscribe request, - // nothing much we can do in this case - subStats.incrementFailedOps(); - return; - } - } - // initialize the message filter - PipelineFilter filter = new PipelineFilter(); - try { - // the filter pipeline should be - // 1) AllToAllTopologyFilter to filter cross-region messages - filter.addLast(new AllToAllTopologyFilter()); - // 2) User-Customized MessageFilter - if (subData.hasPreferences() && - subData.getPreferences().hasMessageFilter()) { - String messageFilterName = subData.getPreferences().getMessageFilter(); - filter.addLast(ReflectionUtils.newInstance(messageFilterName, ServerMessageFilter.class)); - } - // initialize the filter - filter.initialize(cfg.getConf()); - filter.setSubscriptionPreferences(topic, subscriberId, - subData.getPreferences()); - } catch (RuntimeException re) { - String errMsg = "RuntimeException caught when instantiating message filter for (topic:" - + topic.toStringUtf8() + ", subscriber:" + subscriberId.toStringUtf8() + ")." - + "It might be introduced by programming error in message filter."; - logger.error(errMsg, re); - PubSubException pse = new PubSubException.InvalidMessageFilterException(errMsg, re); - subStats.incrementFailedOps(); - // we should not close the subscription channel, just response error - // client decide to close it or not. - channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId())); - return; - } catch (Throwable t) { - String errMsg = "Failed to instantiate message filter for (topic:" + topic.toStringUtf8() - + ", subscriber:" + subscriberId.toStringUtf8() + ")."; - logger.error(errMsg, t); - PubSubException pse = new PubSubException.InvalidMessageFilterException(errMsg, t); - subStats.incrementFailedOps(); - channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId())) - .addListener(ChannelFutureListener.CLOSE); - return; - } - boolean forceAttach = false; - if (subRequest.hasForceAttach()) { - forceAttach = subRequest.getForceAttach(); - } - // Try to store the subscription channel for the topic subscriber - Channel oldChannel = subChannelMgr.put(topicSub, channel, forceAttach); - if (null != oldChannel) { - PubSubException pse = new PubSubException.TopicBusyException( - "Subscriber " + subscriberId.toStringUtf8() + " for topic " + topic.toStringUtf8() - + " is already being served on a different channel " + oldChannel + "."); - subStats.incrementFailedOps(); - channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId())) - .addListener(ChannelFutureListener.CLOSE); - return; - } - - // want to start 1 ahead of the consume ptr - MessageSeqId lastConsumedSeqId = subData.getState().getMsgId(); - MessageSeqId seqIdToStartFrom = MessageSeqId.newBuilder(lastConsumedSeqId).setLocalComponent( - lastConsumedSeqId.getLocalComponent() + 1).build(); - deliveryMgr.startServingSubscription(topic, subscriberId, - subData.getPreferences(), seqIdToStartFrom, new ChannelEndPoint(channel), filter, - new Callback<Void>() { - @Override - public void operationFinished(Object ctx, Void result) { - // First write success and then tell the delivery manager, - // otherwise the first message might go out before the response - // to the subscribe - SubscribeResponse.Builder subRespBuilder = SubscribeResponse.newBuilder() - .setPreferences(subData.getPreferences()); - ResponseBody respBody = ResponseBody.newBuilder() - .setSubscribeResponse(subRespBuilder).build(); - channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId(), respBody)); - logger.info("Subscribe request (" + request.getTxnId() + ") for (topic:" - + topic.toStringUtf8() + ", subscriber:" + subscriberId.toStringUtf8() - + ") from channel " + channel.getRemoteAddress() - + " succeed - its subscription data is " - + SubscriptionStateUtils.toString(subData)); - subStats.updateLatency(MathUtils.now() - requestTime); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - // would not happened - } - }, null); - } - }, null); - - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java deleted file mode 100644 index 3481d81..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.handlers; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; - -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; -import org.apache.hedwig.protoextensions.PubSubResponseUtils; -import static org.apache.hedwig.util.VarArgs.va; - -public class SubscriptionChannelManager implements ChannelDisconnectListener { - - private static final Logger logger = LoggerFactory.getLogger(SubscriptionChannelManager.class); - - static class CloseSubscriptionListener implements ChannelFutureListener { - - final TopicSubscriber ts; - - CloseSubscriptionListener(TopicSubscriber topicSubscriber) { - this.ts = topicSubscriber; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - logger.warn("Failed to write response to close old subscription {}.", ts); - } else { - logger.debug("Close old subscription {} succeed.", ts); - } - } - }; - - final List<SubChannelDisconnectedListener> listeners; - - public interface SubChannelDisconnectedListener { - /** - * Act on a particular topicSubscriber being disconnected - * @param topicSubscriber - */ - public void onSubChannelDisconnected(TopicSubscriber topicSubscriber); - } - - final ConcurrentHashMap<TopicSubscriber, Channel> sub2Channel; - final ConcurrentHashMap<Channel, Set<TopicSubscriber>> channel2sub; - - public SubscriptionChannelManager() { - sub2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>(); - channel2sub = new ConcurrentHashMap<Channel, Set<TopicSubscriber>>(); - listeners = new ArrayList<SubChannelDisconnectedListener>(); - } - - public void addSubChannelDisconnectedListener(SubChannelDisconnectedListener listener) { - if (null != listener) { - listeners.add(listener); - } - } - - @Override - public void channelDisconnected(Channel channel) { - // Evils of synchronized programming: there is a race between a channel - // getting disconnected, and us adding it to the maps when a subscribe - // succeeds - Set<TopicSubscriber> topicSubs; - synchronized (channel) { - topicSubs = channel2sub.remove(channel); - } - if (topicSubs != null) { - for (TopicSubscriber topicSub : topicSubs) { - logger.info("Subscription channel {} for {} is disconnected.", - va(channel.getRemoteAddress(), topicSub)); - // remove entry only currently mapped to given value. - sub2Channel.remove(topicSub, channel); - for (SubChannelDisconnectedListener listener : listeners) { - listener.onSubChannelDisconnected(topicSub); - } - } - } - } - - public int getNumSubscriptionChannels() { - return channel2sub.size(); - } - - public int getNumSubscriptions() { - return sub2Channel.size(); - } - - /** - * Put <code>topicSub</code> on Channel <code>channel</code>. - * - * @param topicSub - * Topic Subscription - * @param channel - * Netty channel - * @param mode - * Create or Attach mode - * @return null succeed, otherwise the old existed channel. - */ - public Channel put(TopicSubscriber topicSub, Channel channel, boolean forceAttach) { - // race with channel getting disconnected while we are adding it - // to the 2 maps - synchronized (channel) { - Channel oldChannel = sub2Channel.putIfAbsent(topicSub, channel); - // if a subscribe request send from same channel, - // we treated it a success action. - if (null != oldChannel && !oldChannel.equals(channel)) { - boolean subSuccess = false; - if (forceAttach) { - // it is safe to close old subscription here since the new subscription - // has come from other channel succeed. - synchronized (oldChannel) { - Set<TopicSubscriber> oldTopicSubs = channel2sub.get(oldChannel); - if (null != oldTopicSubs) { - if (!oldTopicSubs.remove(topicSub)) { - logger.warn("Failed to remove old subscription ({}) due to it isn't on channel ({}).", - va(topicSub, oldChannel)); - } else if (oldTopicSubs.isEmpty()) { - channel2sub.remove(oldChannel); - } - } - } - PubSubResponse resp = PubSubResponseUtils.getResponseForSubscriptionEvent( - topicSub.getTopic(), topicSub.getSubscriberId(), - SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED - ); - oldChannel.write(resp).addListener(new CloseSubscriptionListener(topicSub)); - logger.info("Subscribe request for ({}) from channel ({}) closes old subscripiton on channel ({}).", - va(topicSub, channel, oldChannel)); - // try replace the oldChannel - // if replace failure, it migth caused because channelDisconnect callback - // has removed the old channel. - if (!sub2Channel.replace(topicSub, oldChannel, channel)) { - // try to add it now. - // if add failure, it means other one has obtained the channel - oldChannel = sub2Channel.putIfAbsent(topicSub, channel); - if (null == oldChannel) { - subSuccess = true; - } - } else { - subSuccess = true; - } - } - if (!subSuccess) { - logger.error("Error serving subscribe request for ({}) from ({}) since it already served on ({}).", - va(topicSub, channel, oldChannel)); - return oldChannel; - } - } - // channel2sub is just a cache, so we can add to it - // without synchronization - Set<TopicSubscriber> topicSubs = channel2sub.get(channel); - if (null == topicSubs) { - topicSubs = new HashSet<TopicSubscriber>(); - channel2sub.put(channel, topicSubs); - } - topicSubs.add(topicSub); - return null; - } - } - - /** - * Remove <code>topicSub</code> from Channel <code>channel</code> - * - * @param topicSub - * Topic Subscription - * @param channel - * Netty channel - */ - public void remove(TopicSubscriber topicSub, Channel channel) { - synchronized (channel) { - Set<TopicSubscriber> topicSubs = channel2sub.get(channel); - if (null != topicSubs) { - if (!topicSubs.remove(topicSub)) { - logger.warn("Failed to remove subscription ({}) due to it isn't on channel ({}).", - va(topicSub, channel)); - } else if (topicSubs.isEmpty()) { - channel2sub.remove(channel); - } - } - if (!sub2Channel.remove(topicSub, channel)) { - logger.warn("Failed to remove channel ({}) due to it isn't ({})'s channel.", - va(channel, topicSub)); - } - } - } -}
