http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java deleted file mode 100644 index 614efa1..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractHChannelManager.java +++ /dev/null @@ -1,622 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl; - -import java.net.InetSocketAddress; -import java.util.HashSet; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; - -import com.google.protobuf.ByteString; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.client.exceptions.NoResponseHandlerException; -import org.apache.hedwig.client.handlers.MessageConsumeCallback; -import org.apache.hedwig.client.netty.CleanupChannelMap; -import org.apache.hedwig.client.netty.HChannel; -import org.apache.hedwig.client.netty.HChannelManager; -import org.apache.hedwig.client.netty.NetUtils; -import org.apache.hedwig.client.netty.SubscriptionEventEmitter; -import org.apache.hedwig.client.ssl.SslClientContextFactory; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.filter.ClientMessageFilter; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.util.Callback; -import static org.apache.hedwig.util.VarArgs.va; - -/** - * Basic HChannel Manager Implementation - */ -public abstract class AbstractHChannelManager implements HChannelManager { - - private static final Logger logger = LoggerFactory.getLogger(AbstractHChannelManager.class); - - // Empty Topic List - private final static Set<ByteString> EMPTY_TOPIC_SET = - new HashSet<ByteString>(); - - // Boolean indicating if the channel manager is running or has been closed. - // Once we stop the manager, we should sidestep all of the connect, write callback - // and channel disconnected logic. - protected boolean closed = false; - protected final ReentrantReadWriteLock closedLock = - new ReentrantReadWriteLock(); - - // Global counter used for generating unique transaction ID's for - // publish and subscribe requests - protected final AtomicLong globalCounter = new AtomicLong(); - - // Concurrent Map to store the mapping from the Topic to the Host. - // This could change over time since servers can drop mastership of topics - // for load balancing or failover. If a server host ever goes down, we'd - // also want to remove all topic mappings the host was responsible for. - // The second Map is used as the inverted version of the first one. - protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host = - new ConcurrentHashMap<ByteString, InetSocketAddress>(); - // The inverse mapping is used only when clearing all topics. For performance - // consideration, we don't guarantee host2Topics to be consistent with - // topic2Host. it would be better to not rely on this mapping for anything - // significant. - protected final ConcurrentMap<InetSocketAddress, Set<ByteString>> host2Topics = - new ConcurrentHashMap<InetSocketAddress, Set<ByteString>>(); - - // This channels will be used for publish and unsubscribe requests - protected final CleanupChannelMap<InetSocketAddress> host2NonSubscriptionChannels = - new CleanupChannelMap<InetSocketAddress>(); - - private final ClientConfiguration cfg; - // The Netty socket factory for making connections to the server. - protected final ChannelFactory socketFactory; - // PipelineFactory to create non-subscription netty channels to the appropriate server - private final ClientChannelPipelineFactory nonSubscriptionChannelPipelineFactory; - // ssl context factory - private SslClientContextFactory sslFactory = null; - - // default server channel - private final HChannel defaultServerChannel; - - // Each client instantiation will have a Timer for running recurring - // threads. One such timer task thread to is to timeout long running - // PubSubRequests that are waiting for an ack response from the server. - private final Timer clientTimer = new Timer(true); - // a common consume callback for all consume requests. - private final MessageConsumeCallback consumeCb; - // A event emitter to emit subscription events - private final SubscriptionEventEmitter eventEmitter; - - protected AbstractHChannelManager(ClientConfiguration cfg, - ChannelFactory socketFactory) { - this.cfg = cfg; - this.socketFactory = socketFactory; - this.nonSubscriptionChannelPipelineFactory = - new NonSubscriptionChannelPipelineFactory(cfg, this); - - // create a default server channel - defaultServerChannel = - new DefaultServerChannel(cfg.getDefaultServerHost(), this); - - if (cfg.isSSLEnabled()) { - sslFactory = new SslClientContextFactory(cfg); - } - - consumeCb = new MessageConsumeCallback(cfg, this); - eventEmitter = new SubscriptionEventEmitter(); - - // Schedule Request Timeout task. - clientTimer.schedule(new PubSubRequestTimeoutTask(), 0, - cfg.getTimeoutThreadRunInterval()); - } - - @Override - public SubscriptionEventEmitter getSubscriptionEventEmitter() { - return eventEmitter; - } - - public MessageConsumeCallback getConsumeCallback() { - return consumeCb; - } - - public SslClientContextFactory getSslFactory() { - return sslFactory; - } - - protected ChannelFactory getChannelFactory() { - return socketFactory; - } - - protected ClientChannelPipelineFactory getNonSubscriptionChannelPipelineFactory() { - return this.nonSubscriptionChannelPipelineFactory; - } - - protected abstract ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory(); - - @Override - public void schedule(final TimerTask task, final long delay) { - this.closedLock.readLock().lock(); - try { - if (closed) { - logger.warn("Task {} is not scheduled due to the channel manager is closed.", - task); - return; - } - clientTimer.schedule(task, delay); - } finally { - this.closedLock.readLock().unlock(); - } - } - - @Override - public void submitOpAfterDelay(final PubSubData pubSubData, final long delay) { - this.closedLock.readLock().lock(); - try { - if (closed) { - pubSubData.getCallback().operationFailed(pubSubData.context, - new ServiceDownException("Client has been closed.")); - return; - } - clientTimer.schedule(new TimerTask() { - @Override - public void run() { - logger.debug("Submit request {} in {} ms later.", - va(pubSubData, delay)); - submitOp(pubSubData); - } - }, delay); - } finally { - closedLock.readLock().unlock(); - } - } - - @Override - public void submitOp(PubSubData pubSubData) { - HChannel hChannel; - if (OperationType.PUBLISH.equals(pubSubData.operationType) || - OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) { - hChannel = getNonSubscriptionChannelByTopic(pubSubData.topic); - } else { - TopicSubscriber ts = new TopicSubscriber(pubSubData.topic, - pubSubData.subscriberId); - hChannel = getSubscriptionChannelByTopicSubscriber(ts); - } - // no channel found to submit pubsub data - // choose the default server - if (null == hChannel) { - hChannel = defaultServerChannel; - } - hChannel.submitOp(pubSubData); - } - - @Override - public void redirectToHost(PubSubData pubSubData, InetSocketAddress host) { - logger.debug("Submit operation {} to host {}.", - va(pubSubData, host)); - HChannel hChannel; - if (OperationType.PUBLISH.equals(pubSubData.operationType) || - OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) { - hChannel = getNonSubscriptionChannel(host); - if (null == hChannel) { - // create a channel to connect to specified host - hChannel = createAndStoreNonSubscriptionChannel(host); - } - } else { - hChannel = getSubscriptionChannel(host); - if (null == hChannel) { - // create a subscription channel to specified host - hChannel = createAndStoreSubscriptionChannel(host); - } - } - // no channel found to submit pubsub data - // choose the default server - if (null == hChannel) { - hChannel = defaultServerChannel; - } - hChannel.submitOp(pubSubData); - } - - void submitOpThruChannel(PubSubData pubSubData, Channel channel) { - logger.debug("Submit operation {} to thru channel {}.", - va(pubSubData, channel)); - HChannel hChannel; - if (OperationType.PUBLISH.equals(pubSubData.operationType) || - OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) { - hChannel = createAndStoreNonSubscriptionChannel(channel); - } else { - hChannel = createAndStoreSubscriptionChannel(channel); - } - hChannel.submitOp(pubSubData); - } - - @Override - public void submitOpToDefaultServer(PubSubData pubSubData) { - logger.debug("Submit operation {} to default server {}.", - va(pubSubData, defaultServerChannel)); - defaultServerChannel.submitOp(pubSubData); - } - - // Synchronized method to store the host2Channel mapping (if it doesn't - // exist yet). Retrieve the hostname info from the Channel created via the - // RemoteAddress tied to it. - private HChannel createAndStoreNonSubscriptionChannel(Channel channel) { - InetSocketAddress host = NetUtils.getHostFromChannel(channel); - HChannel newHChannel = new HChannelImpl(host, channel, this, - getNonSubscriptionChannelPipelineFactory()); - return storeNonSubscriptionChannel(host, newHChannel); - } - - private HChannel createAndStoreNonSubscriptionChannel(InetSocketAddress host) { - HChannel newHChannel = new HChannelImpl(host, this, - getNonSubscriptionChannelPipelineFactory()); - return storeNonSubscriptionChannel(host, newHChannel); - } - - private HChannel storeNonSubscriptionChannel(InetSocketAddress host, - HChannel newHChannel) { - return host2NonSubscriptionChannels.addChannel(host, newHChannel); - } - - /** - * Is there a {@link HChannel} existed for a given host. - * - * @param host - * Target host address. - */ - private HChannel getNonSubscriptionChannel(InetSocketAddress host) { - return host2NonSubscriptionChannels.getChannel(host); - } - - /** - * Get a non-subscription channel for a given <code>topic</code>. - * - * @param topic - * Topic Name - * @return if <code>topic</code>'s owner is unknown, return null. - * if <code>topic</code>'s owner is know and there is a channel - * existed before, return the existed channel, otherwise created - * a new one. - */ - private HChannel getNonSubscriptionChannelByTopic(ByteString topic) { - InetSocketAddress host = topic2Host.get(topic); - if (null == host) { - // we don't know where is the topic - return null; - } else { - // we had know which server owned the topic - HChannel channel = getNonSubscriptionChannel(host); - if (null == channel) { - // create a channel to connect to specified host - channel = createAndStoreNonSubscriptionChannel(host); - } - return channel; - } - } - - /** - * Handle the disconnected event from a non-subscription {@link HChannel}. - * - * @param host - * Which host is disconnected. - * @param channel - * The underlying established channel. - */ - protected void onNonSubscriptionChannelDisconnected(InetSocketAddress host, - Channel channel) { - // Only remove the Channel from the mapping if this current - // disconnected channel is the same as the cached entry. - // Due to race concurrency situations, it is possible to - // create multiple channels to the same host for publish - // and unsubscribe requests. - HChannel hChannel = host2NonSubscriptionChannels.getChannel(host); - if (null == hChannel) { - return; - } - Channel underlyingChannel = hChannel.getChannel(); - if (null == underlyingChannel || - !underlyingChannel.equals(channel)) { - return; - } - logger.info("NonSubscription Channel {} to {} disconnected.", - va(channel, host)); - // remove existed channel - if (host2NonSubscriptionChannels.removeChannel(host, hChannel)) { - clearAllTopicsForHost(host); - } - } - - /** - * Create and store a subscription {@link HChannel} thru the underlying established - * <code>channel</code> - * - * @param channel - * The underlying established subscription channel. - */ - protected abstract HChannel createAndStoreSubscriptionChannel(Channel channel); - - /** - * Create and store a subscription {@link HChannel} to target host. - * - * @param host - * Target host address. - */ - protected abstract HChannel createAndStoreSubscriptionChannel(InetSocketAddress host); - - /** - * Is there a subscription {@link HChannel} existed for a given host. - * - * @param host - * Target host address. - */ - protected abstract HChannel getSubscriptionChannel(InetSocketAddress host); - - /** - * Get a subscription channel for a given <code>topicSubscriber</code>. - * - * @param topicSubscriber - * Topic Subscriber - * @return if <code>topic</code>'s owner is unknown, return null. - * if <code>topic</code>'s owner is know and there is a channel - * existed before, return the existed channel, otherwise created - * a new one for the <code>topicSubscriber</code>. - */ - protected abstract HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber topicSubscriber); - - /** - * Handle the disconnected event from a subscription {@link HChannel}. - * - * @param host - * Which host is disconnected. - * @param channel - * The underlying established channel. - */ - protected abstract void onSubscriptionChannelDisconnected(InetSocketAddress host, - Channel channel); - - private void sendConsumeRequest(final TopicSubscriber topicSubscriber, - final MessageSeqId messageSeqId, - final Channel channel) { - PubSubRequest.Builder pubsubRequestBuilder = - NetUtils.buildConsumeRequest(nextTxnId(), topicSubscriber, messageSeqId); - - // For Consume requests, we will send them from the client in a fire and - // forget manner. We are not expecting the server to send back an ack - // response so no need to register this in the ResponseHandler. There - // are no callbacks to invoke since this isn't a client initiated - // action. Instead, just have a future listener that will log an error - // message if there was a problem writing the consume request. - logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}", - va(NetUtils.getHostFromChannel(channel), messageSeqId, topicSubscriber)); - ChannelFuture future = channel.write(pubsubRequestBuilder.build()); - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}", - va(NetUtils.getHostFromChannel(channel), - messageSeqId, topicSubscriber)); - } - } - }); - } - - /** - * Helper method to store the topic2Host mapping in the channel manager cache - * map. This method is assumed to be called when we've done a successful - * connection to the correct server topic master. - * - * @param topic - * Topic Name - * @param host - * Host Address - */ - protected void storeTopic2HostMapping(ByteString topic, InetSocketAddress host) { - InetSocketAddress oldHost = topic2Host.putIfAbsent(topic, host); - if (null != oldHost && oldHost.equals(host)) { - // Entry in map exists for the topic but it is the same as the - // current host. In this case there is nothing to do. - return; - } - - if (null != oldHost) { - if (topic2Host.replace(topic, oldHost, host)) { - // Store the relevant mappings for this topic and host combination. - logger.debug("Storing info for topic: {}, old host: {}, new host: {}.", - va(topic.toStringUtf8(), oldHost, host)); - clearHostForTopic(topic, oldHost); - } else { - logger.warn("Ownership of topic: {} has been changed from {} to {} when storeing host: {}", - va(topic.toStringUtf8(), oldHost, topic2Host.get(topic), host)); - return; - } - } else { - logger.debug("Storing info for topic: {}, host: {}.", - va(topic.toStringUtf8(), host)); - } - Set<ByteString> topicsForHost = host2Topics.get(host); - if (null == topicsForHost) { - Set<ByteString> newTopicsSet = new HashSet<ByteString>(); - topicsForHost = host2Topics.putIfAbsent(host, newTopicsSet); - if (null == topicsForHost) { - topicsForHost = newTopicsSet; - } - } - synchronized (topicsForHost) { - // check whether the ownership changed, since it might happened - // after replace succeed - if (host.equals(topic2Host.get(topic))) { - topicsForHost.add(topic); - } - } - } - - // If a server host goes down or the channel to it gets disconnected, - // we want to clear out all relevant cached information. We'll - // need to remove all of the topic mappings that the host was - // responsible for. - protected void clearAllTopicsForHost(InetSocketAddress host) { - logger.debug("Clearing all topics for host: {}", host); - // For each of the topics that the host was responsible for, - // remove it from the topic2Host mapping. - Set<ByteString> topicsForHost = host2Topics.get(host); - if (null != topicsForHost) { - synchronized (topicsForHost) { - for (ByteString topic : topicsForHost) { - logger.debug("Removing mapping for topic: {} from host: {}.", - va(topic.toStringUtf8(), host)); - topic2Host.remove(topic, host); - } - } - // Now it is safe to remove the host2Topics mapping entry. - host2Topics.remove(host, topicsForHost); - } - } - - // If a subscribe channel goes down, the topic might have moved. - // We only clear out that topic for the host and not all cached information. - public void clearHostForTopic(ByteString topic, InetSocketAddress host) { - logger.debug("Clearing topic: {} from host: {}.", - va(topic.toStringUtf8(), host)); - if (topic2Host.remove(topic, host)) { - logger.debug("Removed topic to host mapping for topic: {} and host: {}.", - va(topic.toStringUtf8(), host)); - } - Set<ByteString> topicsForHost = host2Topics.get(host); - if (null != topicsForHost) { - boolean removed; - synchronized (topicsForHost) { - removed = topicsForHost.remove(topic); - } - if (removed) { - logger.debug("Removed topic: {} from host: {}.", - topic.toStringUtf8(), host); - if (topicsForHost.isEmpty()) { - // remove only topic list is empty - host2Topics.remove(host, EMPTY_TOPIC_SET); - } - } - } - } - - @Override - public long nextTxnId() { - return globalCounter.incrementAndGet(); - } - - // We need to deal with the possible problem of a PubSub request being - // written to successfully to the server host but for some reason, the - // ack message back never comes. What could happen is that the VoidCallback - // stored in the ResponseHandler.txn2PublishData map will never be called. - // We should have a configured timeout so if that passes from the time a - // write was successfully done to the server, we can fail this async PubSub - // transaction. The caller could possibly redo the transaction if needed at - // a later time. Creating a timeout cleaner TimerTask to do this here. - class PubSubRequestTimeoutTask extends TimerTask { - /** - * Implement the TimerTask's abstract run method. - */ - @Override - public void run() { - if (isClosed()) { - return; - } - logger.debug("Running the PubSubRequest Timeout Task"); - // First check those non-subscription channels - for (HChannel channel : host2NonSubscriptionChannels.getChannels()) { - try { - HChannelHandler channelHandler = - HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel()); - channelHandler.checkTimeoutRequests(); - } catch (NoResponseHandlerException nrhe) { - continue; - } - } - // Then check those subscription channels - checkTimeoutRequestsOnSubscriptionChannels(); - } - } - - protected abstract void restartDelivery(TopicSubscriber topicSubscriber) - throws ClientNotSubscribedException, AlreadyStartDeliveryException; - - /** - * Chekout the pub/sub requests on subscription channels. - */ - protected abstract void checkTimeoutRequestsOnSubscriptionChannels(); - - @Override - public boolean isClosed() { - closedLock.readLock().lock(); - try { - return closed; - } finally { - closedLock.readLock().unlock(); - } - } - - /** - * Close all subscription channels when close channel manager. - */ - protected abstract void closeSubscriptionChannels(); - - @Override - public void close() { - logger.info("Shutting down the channels manager."); - closedLock.writeLock().lock(); - try { - // Not first time to close - if (closed) { - return; - } - closed = true; - } finally { - closedLock.writeLock().unlock(); - } - clientTimer.cancel(); - // Clear all existed channels - host2NonSubscriptionChannels.close(); - - // clear all subscription channels - closeSubscriptionChannels(); - - // Clear out all Maps - topic2Host.clear(); - host2Topics.clear(); - } - -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java deleted file mode 100644 index 7fcfc44..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java +++ /dev/null @@ -1,365 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl; - -import java.net.InetSocketAddress; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import com.google.protobuf.ByteString; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.Channel; - -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.client.handlers.SubscribeResponseHandler; -import org.apache.hedwig.client.netty.HChannelManager; -import org.apache.hedwig.client.netty.HChannel; -import org.apache.hedwig.client.netty.NetUtils; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.protoextensions.MessageIdUtils; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; -import org.apache.hedwig.util.Either; -import static org.apache.hedwig.util.VarArgs.va; - -public abstract class AbstractSubscribeResponseHandler extends SubscribeResponseHandler { - - private static final Logger logger = - LoggerFactory.getLogger(AbstractSubscribeResponseHandler.class); - - protected final ReentrantReadWriteLock disconnectLock = - new ReentrantReadWriteLock(); - - protected final ConcurrentMap<TopicSubscriber, ActiveSubscriber> subscriptions - = new ConcurrentHashMap<TopicSubscriber, ActiveSubscriber>(); - protected final AbstractHChannelManager aChannelManager; - - protected AbstractSubscribeResponseHandler(ClientConfiguration cfg, - HChannelManager channelManager) { - super(cfg, channelManager); - this.aChannelManager = (AbstractHChannelManager) channelManager; - } - - protected HChannelManager getHChannelManager() { - return this.channelManager; - } - - protected ClientConfiguration getConfiguration() { - return cfg; - } - - protected ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) { - return subscriptions.get(ts); - } - - protected ActiveSubscriber createActiveSubscriber( - ClientConfiguration cfg, AbstractHChannelManager channelManager, - TopicSubscriber ts, PubSubData op, SubscriptionPreferences preferences, - Channel channel, HChannel hChannel) { - return new ActiveSubscriber(cfg, channelManager, ts, op, preferences, channel, hChannel); - } - - @Override - public void handleResponse(PubSubResponse response, PubSubData pubSubData, - Channel channel) throws Exception { - if (logger.isDebugEnabled()) { - logger.debug("Handling a Subscribe response: {}, pubSubData: {}, host: {}.", - va(response, pubSubData, NetUtils.getHostFromChannel(channel))); - } - switch (response.getStatusCode()) { - case SUCCESS: - TopicSubscriber ts = new TopicSubscriber(pubSubData.topic, - pubSubData.subscriberId); - SubscriptionPreferences preferences = null; - if (response.hasResponseBody()) { - ResponseBody respBody = response.getResponseBody(); - if (respBody.hasSubscribeResponse()) { - SubscribeResponse resp = respBody.getSubscribeResponse(); - if (resp.hasPreferences()) { - preferences = resp.getPreferences(); - if (logger.isDebugEnabled()) { - logger.debug("Receive subscription preferences for {} : {}", - va(ts, - SubscriptionStateUtils.toString(preferences))); - } - } - } - } - - Either<StatusCode, HChannel> result; - StatusCode statusCode; - ActiveSubscriber ss = null; - // Store the Subscribe state - disconnectLock.readLock().lock(); - try { - result = handleSuccessResponse(ts, pubSubData, channel); - statusCode = result.left(); - if (StatusCode.SUCCESS == statusCode) { - ss = createActiveSubscriber( - cfg, aChannelManager, ts, pubSubData, preferences, channel, result.right()); - statusCode = addSubscription(ts, ss); - } - } finally { - disconnectLock.readLock().unlock(); - } - if (StatusCode.SUCCESS == statusCode) { - postHandleSuccessResponse(ts, ss); - // Response was success so invoke the callback's operationFinished - // method. - pubSubData.getCallback().operationFinished(pubSubData.context, null); - } else { - PubSubException exception = PubSubException.create(statusCode, - "Client is already subscribed for " + ts); - pubSubData.getCallback().operationFailed(pubSubData.context, exception); - } - break; - case CLIENT_ALREADY_SUBSCRIBED: - // For Subscribe requests, the server says that the client is - // already subscribed to it. - pubSubData.getCallback().operationFailed(pubSubData.context, - new ClientAlreadySubscribedException("Client is already subscribed for topic: " - + pubSubData.topic.toStringUtf8() + ", subscriberId: " - + pubSubData.subscriberId.toStringUtf8())); - break; - case SERVICE_DOWN: - // Response was service down failure so just invoke the callback's - // operationFailed method. - pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException( - "Server responded with a SERVICE_DOWN status")); - break; - case NOT_RESPONSIBLE_FOR_TOPIC: - // Redirect response so we'll need to repost the original Subscribe - // Request - handleRedirectResponse(response, pubSubData, channel); - break; - default: - // Consider all other status codes as errors, operation failed - // cases. - logger.error("Unexpected error response from server for PubSubResponse: " + response); - pubSubData.getCallback().operationFailed(pubSubData.context, - new ServiceDownException("Server responded with a status code of: " - + response.getStatusCode(), - PubSubException.create(response.getStatusCode(), - "Original Exception"))); - break; - } - } - - /** - * Handle success response for a specific TopicSubscriber <code>ts</code>. The method - * is triggered after subscribed successfully. - * - * @param ts - * Topic Subscriber. - * @param pubSubData - * Pub/Sub Request data for this subscribe request. - * @param channel - * Subscription Channel. - * @return status code to indicate what happened - */ - protected abstract Either<StatusCode, HChannel> handleSuccessResponse( - TopicSubscriber ts, PubSubData pubSubData, Channel channel); - - protected void postHandleSuccessResponse(TopicSubscriber ts, ActiveSubscriber ss) { - // do nothing now - } - - private StatusCode addSubscription(TopicSubscriber ts, ActiveSubscriber ss) { - ActiveSubscriber oldSS = subscriptions.putIfAbsent(ts, ss); - if (null != oldSS) { - return StatusCode.CLIENT_ALREADY_SUBSCRIBED; - } else { - return StatusCode.SUCCESS; - } - } - - @Override - public void handleSubscribeMessage(PubSubResponse response) { - Message message = response.getMessage(); - TopicSubscriber ts = new TopicSubscriber(response.getTopic(), - response.getSubscriberId()); - if (logger.isDebugEnabled()) { - logger.debug("Handling a Subscribe message in response: {}, {}", - va(response, ts)); - } - ActiveSubscriber ss = getActiveSubscriber(ts); - if (null == ss) { - logger.error("Subscriber {} is not found receiving its message {}.", - va(ts, MessageIdUtils.msgIdToReadableString(message.getMsgId()))); - return; - } - ss.handleMessage(message); - } - - @Override - protected void asyncMessageDeliver(TopicSubscriber topicSubscriber, - Message message) { - ActiveSubscriber ss = getActiveSubscriber(topicSubscriber); - if (null == ss) { - logger.error("Subscriber {} is not found delivering its message {}.", - va(topicSubscriber, - MessageIdUtils.msgIdToReadableString(message.getMsgId()))); - return; - } - ss.asyncMessageDeliver(message); - } - - @Override - protected void messageConsumed(TopicSubscriber topicSubscriber, - Message message) { - ActiveSubscriber ss = getActiveSubscriber(topicSubscriber); - if (null == ss) { - logger.warn("Subscriber {} is not found consumed its message {}.", - va(topicSubscriber, - MessageIdUtils.msgIdToReadableString(message.getMsgId()))); - return; - } - if (logger.isDebugEnabled()) { - logger.debug("Message has been successfully consumed by the client app : {}, {}", - va(message, topicSubscriber)); - } - ss.messageConsumed(message); - } - - @Override - public void handleSubscriptionEvent(ByteString topic, ByteString subscriberId, - SubscriptionEvent event) { - TopicSubscriber ts = new TopicSubscriber(topic, subscriberId); - ActiveSubscriber ss = getActiveSubscriber(ts); - if (null == ss) { - logger.warn("No subscription {} found receiving subscription event {}.", - va(ts, event)); - return; - } - if (logger.isDebugEnabled()) { - logger.debug("Received subscription event {} for ({}).", - va(event, ts)); - } - processSubscriptionEvent(ss, event); - } - - protected void processSubscriptionEvent(ActiveSubscriber as, SubscriptionEvent event) { - switch (event) { - // for all cases we need to resubscribe for the subscription - case TOPIC_MOVED: - case SUBSCRIPTION_FORCED_CLOSED: - resubscribeIfNecessary(as, event); - break; - default: - logger.error("Receive unknown subscription event {} for {}.", - va(event, as.getTopicSubscriber())); - } - } - - @Override - public void startDelivery(final TopicSubscriber topicSubscriber, - MessageHandler messageHandler) - throws ClientNotSubscribedException, AlreadyStartDeliveryException { - ActiveSubscriber ss = getActiveSubscriber(topicSubscriber); - if (null == ss) { - throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber); - } - if (logger.isDebugEnabled()) { - logger.debug("Start delivering message for {} using message handler {}", - va(topicSubscriber, messageHandler)); - } - ss.startDelivery(messageHandler); - } - - @Override - public void stopDelivery(final TopicSubscriber topicSubscriber) - throws ClientNotSubscribedException { - ActiveSubscriber ss = getActiveSubscriber(topicSubscriber); - if (null == ss) { - throw new ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber); - } - if (logger.isDebugEnabled()) { - logger.debug("Stop delivering messages for {}", topicSubscriber); - } - ss.stopDelivery(); - } - - @Override - public boolean hasSubscription(TopicSubscriber topicSubscriber) { - return subscriptions.containsKey(topicSubscriber); - } - - @Override - public void consume(final TopicSubscriber topicSubscriber, - final MessageSeqId messageSeqId) { - ActiveSubscriber ss = getActiveSubscriber(topicSubscriber); - if (null == ss) { - logger.warn("Subscriber {} is not found consuming message {}.", - va(topicSubscriber, - MessageIdUtils.msgIdToReadableString(messageSeqId))); - return; - } - ss.consume(messageSeqId); - } - - @Override - public void onChannelDisconnected(InetSocketAddress host, Channel channel) { - disconnectLock.writeLock().lock(); - try { - onDisconnect(host); - } finally { - disconnectLock.writeLock().unlock(); - } - } - - private void onDisconnect(InetSocketAddress host) { - for (ActiveSubscriber ss : subscriptions.values()) { - onDisconnect(ss, host); - } - } - - private void onDisconnect(ActiveSubscriber ss, InetSocketAddress host) { - logger.info("Subscription channel for ({}) is disconnected.", ss); - resubscribeIfNecessary(ss, SubscriptionEvent.TOPIC_MOVED); - } - - protected boolean removeSubscription(TopicSubscriber ts, ActiveSubscriber ss) { - return subscriptions.remove(ts, ss); - } - - protected void resubscribeIfNecessary(ActiveSubscriber ss, SubscriptionEvent event) { - // if subscriber has been changed, we don't need to resubscribe - if (!removeSubscription(ss.getTopicSubscriber(), ss)) { - return; - } - ss.resubscribeIfNecessary(event); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java deleted file mode 100644 index 10506d8..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java +++ /dev/null @@ -1,382 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl; - -import static org.apache.hedwig.util.VarArgs.va; - -import java.util.LinkedList; -import java.util.Queue; - -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.MessageConsumeData; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.client.netty.FilterableMessageHandler; -import org.apache.hedwig.client.netty.HChannel; -import org.apache.hedwig.client.netty.NetUtils; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.filter.ClientMessageFilter; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.protoextensions.MessageIdUtils; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * an active subscriber handles subscription actions in a channel. - */ -public class ActiveSubscriber { - - private static final Logger logger = LoggerFactory.getLogger(ActiveSubscriber.class); - - protected final ClientConfiguration cfg; - protected final AbstractHChannelManager channelManager; - - // Subscriber related variables - protected final TopicSubscriber topicSubscriber; - protected final PubSubData op; - protected final SubscriptionPreferences preferences; - - // the underlying netty channel to send request - protected final Channel channel; - protected final HChannel hChannel; - - // Counter for the number of consumed messages so far to buffer up before we - // send the Consume message back to the server along with the last/largest - // message seq ID seen so far in that batch. - private int numConsumedMessagesInBuffer = 0; - private MessageSeqId lastMessageSeqId = null; - - // Message Handler - private MessageHandler msgHandler = null; - - // Queue used for subscribes when the MessageHandler hasn't been registered - // yet but we've already received subscription messages from the server. - // This will be lazily created as needed. - private final Queue<Message> msgQueue = new LinkedList<Message>(); - - /** - * Construct an active subscriber instance. - * - * @param cfg - * Client configuration object. - * @param channelManager - * Channel manager instance. - * @param ts - * Topic subscriber. - * @param op - * Pub/Sub request. - * @param preferences - * Subscription preferences for the subscriber. - * @param channel - * Netty channel the subscriber lived. - */ - public ActiveSubscriber(ClientConfiguration cfg, - AbstractHChannelManager channelManager, - TopicSubscriber ts, PubSubData op, - SubscriptionPreferences preferences, - Channel channel, - HChannel hChannel) { - this.cfg = cfg; - this.channelManager = channelManager; - this.topicSubscriber = ts; - this.op = op; - this.preferences = preferences; - this.channel = channel; - this.hChannel = hChannel; - } - - /** - * @return pub/sub request for the subscription. - */ - public PubSubData getPubSubData() { - return this.op; - } - - /** - * @return topic subscriber id for the active subscriber. - */ - public TopicSubscriber getTopicSubscriber() { - return this.topicSubscriber; - } - - /** - * Start delivering messages using given message handler. - * - * @param messageHandler - * Message handler to deliver messages - * @throws AlreadyStartDeliveryException if someone already started delivery. - * @throws ClientNotSubscribedException when start delivery before subscribe. - */ - public synchronized void startDelivery(MessageHandler messageHandler) - throws AlreadyStartDeliveryException, ClientNotSubscribedException { - if (null != this.msgHandler) { - throw new AlreadyStartDeliveryException("A message handler " + msgHandler - + " has been started for " + topicSubscriber); - } - if (null != messageHandler && messageHandler instanceof FilterableMessageHandler) { - FilterableMessageHandler filterMsgHandler = - (FilterableMessageHandler) messageHandler; - if (filterMsgHandler.hasMessageFilter()) { - if (null == preferences) { - // no preferences means talking to an old version hub server - logger.warn("Start delivering messages with filter but no subscription " - + "preferences found. It might due to talking to an old version" - + " hub server."); - // use the original message handler. - messageHandler = filterMsgHandler.getMessageHandler(); - } else { - // pass subscription preferences to message filter - if (logger.isDebugEnabled()) { - logger.debug("Start delivering messages with filter on {}, preferences: {}", - va(topicSubscriber, - SubscriptionStateUtils.toString(preferences))); - } - ClientMessageFilter msgFilter = filterMsgHandler.getMessageFilter(); - msgFilter.setSubscriptionPreferences(topicSubscriber.getTopic(), - topicSubscriber.getSubscriberId(), - preferences); - } - } - } - - this.msgHandler = messageHandler; - // Once the MessageHandler is registered, see if we have any queued up - // subscription messages sent to us already from the server. If so, - // consume those first. Do this only if the MessageHandler registered is - // not null (since that would be the HedwigSubscriber.stopDelivery - // call). - if (null == msgHandler) { - return; - } - if (msgQueue.size() > 0) { - if (logger.isDebugEnabled()) { - logger.debug("Consuming {} queued up messages for {}", - va(msgQueue.size(), topicSubscriber)); - } - for (Message message : msgQueue) { - asyncMessageDeliver(message); - } - // Now we can remove the queued up messages since they are all - // consumed. - msgQueue.clear(); - } - } - - /** - * Stop delivering messages to the subscriber. - */ - public synchronized void stopDelivery() { - this.msgHandler = null; - } - - /** - * Handle received message. - * - * @param message - * Received message. - */ - public synchronized void handleMessage(Message message) { - if (null != msgHandler) { - asyncMessageDeliver(message); - } else { - // MessageHandler has not yet been registered so queue up these - // messages for the Topic Subscription. Make the initial lazy - // creation of the message queue thread safe just so we don't - // run into a race condition where two simultaneous threads process - // a received message and both try to create a new instance of - // the message queue. Performance overhead should be okay - // because the delivery of the topic has not even started yet - // so these messages are not consumed and just buffered up here. - if (logger.isDebugEnabled()) { - logger.debug("Message {} has arrived but no MessageHandler provided for {}" - + " yet so queueing up the message.", - va(MessageIdUtils.msgIdToReadableString(message.getMsgId()), - topicSubscriber)); - } - msgQueue.add(message); - } - } - - /** - * Deliver message to the client. - * - * @param message - * Message to deliver. - */ - public synchronized void asyncMessageDeliver(Message message) { - if (null == msgHandler) { - logger.error("No message handler found to deliver message {} to {}.", - va(MessageIdUtils.msgIdToReadableString(message.getMsgId()), - topicSubscriber)); - return; - } - if (logger.isDebugEnabled()) { - logger.debug("Call the client app's MessageHandler asynchronously to deliver the message {} to {}", - va(message, topicSubscriber)); - } - unsafeDeliverMessage(message); - } - - /** - * Unsafe version to deliver message to a message handler. - * Caller need to handle synchronization issue. - * - * @param message - * Message to deliver. - */ - protected void unsafeDeliverMessage(Message message) { - MessageConsumeData messageConsumeData = - new MessageConsumeData(topicSubscriber, message); - msgHandler.deliver(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(), - message, channelManager.getConsumeCallback(), - messageConsumeData); - } - - private synchronized boolean updateLastMessageSeqId(MessageSeqId seqId) { - if (null != lastMessageSeqId && - seqId.getLocalComponent() <= lastMessageSeqId.getLocalComponent()) { - return false; - } - ++numConsumedMessagesInBuffer; - lastMessageSeqId = seqId; - if (numConsumedMessagesInBuffer >= cfg.getConsumedMessagesBufferSize()) { - numConsumedMessagesInBuffer = 0; - lastMessageSeqId = null; - return true; - } - return false; - } - - /** - * Consume a specific message. - * - * @param messageSeqId - * Message seq id. - */ - public void consume(final MessageSeqId messageSeqId) { - PubSubRequest.Builder pubsubRequestBuilder = - NetUtils.buildConsumeRequest(channelManager.nextTxnId(), - topicSubscriber, messageSeqId); - - // For Consume requests, we will send them from the client in a fire and - // forget manner. We are not expecting the server to send back an ack - // response so no need to register this in the ResponseHandler. There - // are no callbacks to invoke since this isn't a client initiated - // action. Instead, just have a future listener that will log an error - // message if there was a problem writing the consume request. - if (logger.isDebugEnabled()) { - logger.debug("Writing a Consume request to channel: {} with messageSeqId: {} for {}", - va(channel, messageSeqId, topicSubscriber)); - } - ChannelFuture future = channel.write(pubsubRequestBuilder.build()); - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - logger.error("Error writing a Consume request to channel: {} with messageSeqId: {} for {}", - va(channel, messageSeqId, topicSubscriber)); - } - } - }); - } - - /** - * Application acked to consume message. - * - * @param message - * Message consumed by application. - */ - public void messageConsumed(Message message) { - // For consume response to server, there is a config param on how many - // messages to consume and buffer up before sending the consume request. - // We just need to keep a count of the number of messages consumed - // and the largest/latest msg ID seen so far in this batch. Messages - // should be delivered in order and without gaps. Do this only if - // auto-sending of consume messages is enabled. - if (cfg.isAutoSendConsumeMessageEnabled()) { - // Update these variables only if we are auto-sending consume - // messages to the server. Otherwise the onus is on the client app - // to call the Subscriber consume API to let the server know which - // messages it has successfully consumed. - if (updateLastMessageSeqId(message.getMsgId())) { - // Send the consume request and reset the consumed messages buffer - // variables. We will use the same Channel created from the - // subscribe request for the TopicSubscriber. - if (logger.isDebugEnabled()) { - logger.debug("Consume message {} when reaching consumed message buffer limit.", - message.getMsgId()); - } - consume(message.getMsgId()); - } - } - } - - /** - * Resubscribe a subscriber if necessary. - * - * @param event - * Subscription Event. - */ - public void resubscribeIfNecessary(SubscriptionEvent event) { - // clear topic ownership - if (SubscriptionEvent.TOPIC_MOVED == event) { - channelManager.clearHostForTopic(topicSubscriber.getTopic(), - NetUtils.getHostFromChannel(channel)); - } - if (!op.options.getEnableResubscribe()) { - channelManager.getSubscriptionEventEmitter().emitSubscriptionEvent( - topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(), event); - return; - } - // Since the connection to the server host that was responsible - // for the topic died, we are not sure about the state of that - // server. Resend the original subscribe request data to the default - // server host/VIP. Also clear out all of the servers we've - // contacted or attempted to from this request as we are starting a - // "fresh" subscribe request. - op.clearServersList(); - // Set a new type of VoidCallback for this async call. We need this - // hook so after the resubscribe has completed, delivery for - // that topic subscriber should also be restarted (if it was that - // case before the channel disconnect). - final long retryWaitTime = cfg.getSubscribeReconnectRetryWaitTime(); - ResubscribeCallback resubscribeCb = - new ResubscribeCallback(topicSubscriber, op, - channelManager, retryWaitTime); - op.setCallback(resubscribeCb); - op.shouldClaim = false; - op.context = null; - op.setOriginalChannelForResubscribe(hChannel); - if (logger.isDebugEnabled()) { - logger.debug("Resubscribe {} with origSubData {}", - va(topicSubscriber, op)); - } - // resubmit the request - channelManager.submitOp(op); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java deleted file mode 100644 index ab86f23..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ClientChannelPipelineFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl; - -import java.util.Map; - -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; -import org.jboss.netty.handler.codec.frame.LengthFieldPrepender; -import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder; -import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder; -import org.jboss.netty.handler.ssl.SslHandler; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.handlers.AbstractResponseHandler; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; - -public abstract class ClientChannelPipelineFactory implements ChannelPipelineFactory { - - protected ClientConfiguration cfg; - protected AbstractHChannelManager channelManager; - - public ClientChannelPipelineFactory(ClientConfiguration cfg, - AbstractHChannelManager channelManager) { - this.cfg = cfg; - this.channelManager = channelManager; - } - - protected abstract Map<OperationType, AbstractResponseHandler> createResponseHandlers(); - - private HChannelHandler createHChannelHandler() { - return new HChannelHandler(cfg, channelManager, createResponseHandlers()); - } - - // Retrieve a ChannelPipeline from the factory. - public ChannelPipeline getPipeline() throws Exception { - // Create a new ChannelPipline using the factory method from the - // Channels helper class. - ChannelPipeline pipeline = Channels.pipeline(); - if (channelManager.getSslFactory() != null) { - pipeline.addLast("ssl", new SslHandler(channelManager.getSslFactory().getEngine())); - } - pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder( - cfg.getMaximumMessageSize(), 0, 4, 0, 4)); - pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); - - pipeline.addLast("protobufdecoder", new ProtobufDecoder(PubSubProtocol.PubSubResponse.getDefaultInstance())); - pipeline.addLast("protobufencoder", new ProtobufEncoder()); - - pipeline.addLast("responsehandler", createHChannelHandler()); - return pipeline; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java deleted file mode 100644 index 065b2f7..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/DefaultServerChannel.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl; - -import java.net.InetSocketAddress; - -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; - -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import static org.apache.hedwig.util.VarArgs.va; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Handle requests sent to default hub server. <b>DefaultServerChannel</b> would not - * be used as a channel to send requests directly. It just takes the responsibility to - * connect to the default server. After the underlying netty channel is established, - * it would call {@link HChannelManager#submitOpThruChannel()} to send requests thru - * the underlying netty channel. - */ -class DefaultServerChannel extends HChannelImpl { - - private static final Logger logger = LoggerFactory.getLogger(DefaultServerChannel.class); - - DefaultServerChannel(InetSocketAddress host, AbstractHChannelManager channelManager) { - super(host, channelManager); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("[DefaultServer: ").append(host).append("]"); - return sb.toString(); - } - - @Override - public void submitOp(final PubSubData pubSubData) { - // for each pub/sub request sent to default hub server - // we would establish a fresh connection for it - ClientChannelPipelineFactory pipelineFactory; - if (OperationType.PUBLISH.equals(pubSubData.operationType) || - OperationType.UNSUBSCRIBE.equals(pubSubData.operationType)) { - pipelineFactory = channelManager.getNonSubscriptionChannelPipelineFactory(); - } else { - pipelineFactory = channelManager.getSubscriptionChannelPipelineFactory(); - } - ChannelFuture future = connect(host, pipelineFactory); - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - // If the channel has been closed, there is no need to proceed with any callback - // logic here. - if (closed) { - future.getChannel().close(); - return; - } - - // Check if the connection to the server was done successfully. - if (!future.isSuccess()) { - logger.error("Error connecting to host {}.", host); - future.getChannel().close(); - - retryOrFailOp(pubSubData); - // Finished with failure logic so just return. - return; - } - logger.debug("Connected to host {} for pubSubData: {}", - va(host, pubSubData)); - channelManager.submitOpThruChannel(pubSubData, future.getChannel()); - } - }); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java deleted file mode 100644 index c41a329..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java +++ /dev/null @@ -1,280 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl; - -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; -import org.jboss.netty.handler.ssl.SslHandler; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.netty.NetUtils; -import org.apache.hedwig.client.handlers.AbstractResponseHandler; -import org.apache.hedwig.client.handlers.SubscribeResponseHandler; -import org.apache.hedwig.exceptions.PubSubException.UncertainStateException; -import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEventResponse; -import static org.apache.hedwig.util.VarArgs.va; - -public class HChannelHandler extends SimpleChannelHandler { - - private static final Logger logger = LoggerFactory.getLogger(HChannelHandler.class); - - // Concurrent Map to store for each async PubSub request, the txn ID - // and the corresponding PubSub call's data which stores the VoidCallback to - // invoke when we receive a PubSub ack response from the server. - // This is specific to this instance of the HChannelHandler which is - // tied to a specific netty Channel Pipeline. - private final ConcurrentMap<Long, PubSubData> txn2PubSubData = - new ConcurrentHashMap<Long, PubSubData>(); - - // Boolean indicating if we closed the channel this HChannelHandler is - // attached to explicitly or not. If so, we do not need to do the - // channel disconnected logic here. - private volatile boolean channelClosedExplicitly = false; - - private final AbstractHChannelManager channelManager; - private final ClientConfiguration cfg; - - private final Map<OperationType, AbstractResponseHandler> handlers; - private final SubscribeResponseHandler subHandler; - - public HChannelHandler(ClientConfiguration cfg, - AbstractHChannelManager channelManager, - Map<OperationType, AbstractResponseHandler> handlers) { - this.cfg = cfg; - this.channelManager = channelManager; - this.handlers = handlers; - subHandler = (SubscribeResponseHandler) handlers.get(OperationType.SUBSCRIBE); - } - - public SubscribeResponseHandler getSubscribeResponseHandler() { - return subHandler; - } - - public void removeTxn(long txnId) { - txn2PubSubData.remove(txnId); - } - - public void addTxn(long txnId, PubSubData pubSubData) { - txn2PubSubData.put(txnId, pubSubData); - } - - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - // If the Message is not a PubSubResponse, just send it upstream and let - // something else handle it. - if (!(e.getMessage() instanceof PubSubResponse)) { - ctx.sendUpstream(e); - return; - } - // Retrieve the PubSubResponse from the Message that was sent by the - // server. - PubSubResponse response = (PubSubResponse) e.getMessage(); - logger.debug("Response received from host: {}, response: {}.", - va(NetUtils.getHostFromChannel(ctx.getChannel()), response)); - - // Determine if this PubSubResponse is an ack response for a PubSub - // Request or if it is a message being pushed to the client subscriber. - if (response.hasMessage()) { - // Subscribed messages being pushed to the client so handle/consume - // it and return. - if (null == subHandler) { - logger.error("Received message from a non-subscription channel : {}", - response); - } else { - subHandler.handleSubscribeMessage(response); - } - return; - } - - // Process Subscription Events - if (response.hasResponseBody()) { - ResponseBody resp = response.getResponseBody(); - // A special subscription event indicates the state of a subscriber - if (resp.hasSubscriptionEvent()) { - if (null == subHandler) { - logger.error("Received subscription event from a non-subscription channel : {}", - response); - } else { - SubscriptionEventResponse eventResp = resp.getSubscriptionEvent(); - logger.debug("Received subscription event {} for (topic:{}, subscriber:{}).", - va(eventResp.getEvent(), response.getTopic(), - response.getSubscriberId())); - subHandler.handleSubscriptionEvent(response.getTopic(), - response.getSubscriberId(), - eventResp.getEvent()); - } - return; - } - } - - // Response is an ack to a prior PubSubRequest so first retrieve the - // PubSub data for this txn. - PubSubData pubSubData = txn2PubSubData.remove(response.getTxnId()); - - // Validate that the PubSub data for this txn is stored. If not, just - // log an error message and return since we don't know how to handle - // this. - if (pubSubData == null) { - logger.error("PubSub Data was not found for PubSubResponse: {}", response); - return; - } - - // Store the topic2Host mapping if this wasn't a server redirect. We'll - // assume that if the server was able to have an open Channel connection - // to the client, and responded with an ack message other than the - // NOT_RESPONSIBLE_FOR_TOPIC one, it is the correct topic master. - if (!response.getStatusCode().equals(StatusCode.NOT_RESPONSIBLE_FOR_TOPIC)) { - // Retrieve the server host that we've connected to and store the - // mapping from the topic to this host. For all other non-redirected - // server statuses, we consider that as a successful connection to the - // correct topic master. - InetSocketAddress host = NetUtils.getHostFromChannel(ctx.getChannel()); - channelManager.storeTopic2HostMapping(pubSubData.topic, host); - } - - // Depending on the operation type, call the appropriate handler. - logger.debug("Handling a {} response: {}, pubSubData: {}, host: {}.", - va(pubSubData.operationType, response, pubSubData, ctx.getChannel())); - AbstractResponseHandler respHandler = handlers.get(pubSubData.operationType); - if (null == respHandler) { - // The above are the only expected PubSubResponse messages received - // from the server for the various client side requests made. - logger.error("Response received from server is for an unhandled operation {}, txnId: {}.", - va(pubSubData.operationType, response.getTxnId())); - pubSubData.getCallback().operationFailed(pubSubData.context, - new UnexpectedConditionException("Can't find response handler for operation " - + pubSubData.operationType)); - return; - } - respHandler.handleResponse(response, pubSubData, ctx.getChannel()); - } - - public void checkTimeoutRequests() { - long curTime = System.currentTimeMillis(); - long timeoutInterval = cfg.getServerAckResponseTimeout(); - for (PubSubData pubSubData : txn2PubSubData.values()) { - checkTimeoutRequest(pubSubData, curTime, timeoutInterval); - } - } - - private void checkTimeoutRequest(PubSubData pubSubData, - long curTime, long timeoutInterval) { - if (curTime > pubSubData.requestWriteTime + timeoutInterval) { - // Current PubSubRequest has timed out so remove it from the - // ResponseHandler's map and invoke the VoidCallback's - // operationFailed method. - logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData); - txn2PubSubData.remove(pubSubData.txnId); - pubSubData.getCallback().operationFailed(pubSubData.context, - new UncertainStateException("Server ack response never received so PubSubRequest has timed out!")); - } - } - - // Logic to deal with what happens when a Channel to a server host is - // disconnected. - @Override - public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - // If this channel was closed explicitly by the client code, - // we do not need to do any of this logic. This could happen - // for redundant Publish channels created or redirected subscribe - // channels that are not used anymore or when we shutdown the - // client and manually close all of the open channels. - // Also don't do any of the disconnect logic if the client has stopped. - if (channelClosedExplicitly || channelManager.isClosed()) { - return; - } - - // Make sure the host retrieved is not null as there could be some weird - // channel disconnect events happening during a client shutdown. - // If it is, just return as there shouldn't be anything we need to do. - InetSocketAddress host = NetUtils.getHostFromChannel(ctx.getChannel()); - if (host == null) { - return; - } - - logger.info("Channel {} was disconnected to host {}.", - va(ctx.getChannel(), host)); - - // If this Channel was used for Publish and Unsubscribe flows, just - // remove it from the HewdigPublisher's host2Channel map. We will - // re-establish a Channel connection to that server when the next - // publish/unsubscribe request to a topic that the server owns occurs. - - // Now determine what type of operation this channel was used for. - if (null == subHandler) { - channelManager.onNonSubscriptionChannelDisconnected(host, ctx.getChannel()); - } else { - channelManager.onSubscriptionChannelDisconnected(host, ctx.getChannel()); - } - - // Finally, all of the PubSubRequests that are still waiting for an ack - // response from the server need to be removed and timed out. Invoke the - // operationFailed callbacks on all of them. Use the - // UncertainStateException since the server did receive the request but - // we're not sure of the state of the request since the ack response was - // never received. - for (PubSubData pubSubData : txn2PubSubData.values()) { - logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: {}", - pubSubData); - pubSubData.getCallback().operationFailed(pubSubData.context, new UncertainStateException( - "Server ack response never received before server connection disconnected!")); - } - txn2PubSubData.clear(); - } - - // Logic to deal with what happens when a Channel to a server host is - // connected. This is needed if the client is using an SSL port to - // communicate with the server. If so, we need to do the SSL handshake here - // when the channel is first connected. - @Override - public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - // No need to initiate the SSL handshake if we are closing this channel - // explicitly or the client has been stopped. - if (cfg.isSSLEnabled() && !channelClosedExplicitly && !channelManager.isClosed()) { - logger.debug("Initiating the SSL handshake"); - ctx.getPipeline().get(SslHandler.class).handshake(); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - logger.error("Exception caught on client channel", e.getCause()); - e.getChannel().close(); - } - - public void closeExplicitly() { - // TODO: BOOKKEEPER-350 : Handle consume buffering, etc here - in a different patch - channelClosedExplicitly = true; - } -}
