http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java deleted file mode 100644 index fd58747..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelImpl.java +++ /dev/null @@ -1,371 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl; - -import java.net.InetSocketAddress; -import java.util.ArrayDeque; -import java.util.LinkedList; -import java.util.Queue; - -import com.google.protobuf.ByteString; - -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; - -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.exceptions.NoResponseHandlerException; -import org.apache.hedwig.client.netty.HChannel; -import org.apache.hedwig.client.netty.NetUtils; -import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.util.HedwigSocketAddress; -import static org.apache.hedwig.util.VarArgs.va; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provide a wrapper over netty channel for Hedwig operations. - */ -public class HChannelImpl implements HChannel { - - private static Logger logger = LoggerFactory.getLogger(HChannelImpl.class); - - enum State { - DISCONNECTED, - CONNECTING, - CONNECTED, - }; - - InetSocketAddress host; - final AbstractHChannelManager channelManager; - final ClientChannelPipelineFactory pipelineFactory; - volatile Channel channel; - volatile State state; - - // Indicates whether the channel is closed or not. - volatile boolean closed = false; - // Queue the pubsub requests when the channel is not connected. - Queue<PubSubData> pendingOps = new ArrayDeque<PubSubData>(); - - /** - * Create a un-established channel with provided target <code>host</code>. - * - * @param host - * Target host address. - * @param channelManager - * Channel manager manages the channels. - */ - protected HChannelImpl(InetSocketAddress host, AbstractHChannelManager channelManager) { - this(host, channelManager, null); - } - - public HChannelImpl(InetSocketAddress host, AbstractHChannelManager channelManager, - ClientChannelPipelineFactory pipelineFactory) { - this(host, null, channelManager, pipelineFactory); - state = State.DISCONNECTED; - } - - /** - * Create a <code>HChannel</code> with an established netty channel. - * - * @param host - * Target host address. - * @param channel - * Established Netty channel. - * @param channelManager - * Channel manager manages the channels. - */ - public HChannelImpl(InetSocketAddress host, Channel channel, - AbstractHChannelManager channelManager, - ClientChannelPipelineFactory pipelineFactory) { - this.host = host; - this.channel = channel; - this.channelManager = channelManager; - this.pipelineFactory = pipelineFactory; - state = State.CONNECTED; - } - - @Override - public void submitOp(PubSubData pubSubData) { - boolean doOpNow = false; - - // common case without lock first - if (null != channel && State.CONNECTED == state) { - doOpNow = true; - } else { - synchronized (this) { - // check channel & state again under lock - if (null != channel && State.CONNECTED == state) { - doOpNow = true; - } else { - // if reached here, channel is either null (first connection attempt), - // or the channel is disconnected. Connection attempt is still in progress, - // queue up this op. Op will be executed when connection attempt either - // fails or succeeds - pendingOps.add(pubSubData); - } - } - if (!doOpNow) { - // start connection attempt to server - connect(); - } - } - if (doOpNow) { - executeOpAfterConnected(pubSubData); - } - } - - /** - * Execute pub/sub operation after the underlying channel is connected. - * - * @param pubSubData - * Pub/Sub Operation - */ - private void executeOpAfterConnected(PubSubData pubSubData) { - PubSubRequest.Builder reqBuilder = - NetUtils.buildPubSubRequest(channelManager.nextTxnId(), pubSubData); - writePubSubRequest(pubSubData, reqBuilder.build()); - } - - @Override - public Channel getChannel() { - return channel; - } - - private void writePubSubRequest(PubSubData pubSubData, PubSubRequest pubSubRequest) { - if (closed || null == channel || State.CONNECTED != state) { - retryOrFailOp(pubSubData); - return; - } - - // Before we do the write, store this information into the - // ResponseHandler so when the server responds, we know what - // appropriate Callback Data to invoke for the given txn ID. - try { - getHChannelHandlerFromChannel(channel) - .addTxn(pubSubData.txnId, pubSubData); - } catch (NoResponseHandlerException nrhe) { - logger.warn("No Channel Handler found for channel {} when writing request." - + " It might already disconnect.", channel); - return; - } - - // Finally, write the pub/sub request through the Channel. - logger.debug("Writing a {} request to host: {} for pubSubData: {}.", - va(pubSubData.operationType, host, pubSubData)); - ChannelFuture future = channel.write(pubSubRequest); - future.addListener(new WriteCallback(pubSubData, channelManager)); - } - - /** - * Re-submit operation to default server or fail it. - * - * @param pubSubData - * Pub/Sub Operation - */ - protected void retryOrFailOp(PubSubData pubSubData) { - // if we were not able to connect to the host, it could be down - ByteString hostString = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(host)); - if (pubSubData.connectFailedServers != null && - pubSubData.connectFailedServers.contains(hostString)) { - // We've already tried to connect to this host before so just - // invoke the operationFailed callback. - logger.error("Error connecting to host {} more than once so fail the request: {}", - va(host, pubSubData)); - pubSubData.getCallback().operationFailed(pubSubData.context, - new CouldNotConnectException("Could not connect to host: " + host)); - } else { - logger.error("Retry to connect to default hub server again for pubSubData: {}", - pubSubData); - // Keep track of this current server that we failed to connect - // to but retry the request on the default server host/VIP. - if (pubSubData.connectFailedServers == null) { - pubSubData.connectFailedServers = new LinkedList<ByteString>(); - } - pubSubData.connectFailedServers.add(hostString); - channelManager.submitOpToDefaultServer(pubSubData); - } - } - - private void onChannelConnected(ChannelFuture future) { - Queue<PubSubData> oldPendingOps; - synchronized (this) { - // if the channel is closed by client, do nothing - if (closed) { - future.getChannel().close(); - return; - } - state = State.CONNECTED; - channel = future.getChannel(); - host = NetUtils.getHostFromChannel(channel); - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<PubSubData>(); - } - for (PubSubData op : oldPendingOps) { - executeOpAfterConnected(op); - } - } - - private void onChannelConnectFailure() { - Queue<PubSubData> oldPendingOps; - synchronized (this) { - state = State.DISCONNECTED; - channel = null; - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<PubSubData>(); - } - for (PubSubData op : oldPendingOps) { - retryOrFailOp(op); - } - } - - private void connect() { - synchronized (this) { - if (State.CONNECTING == state || - State.CONNECTED == state) { - return; - } - state = State.CONNECTING; - } - // Start the connection attempt to the input server host. - ChannelFuture future = connect(host, pipelineFactory); - future.addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - // If the channel has been closed, there is no need to proceed with any - // callback logic here. - if (closed) { - future.getChannel().close(); - return; - } - - if (!future.isSuccess()) { - logger.error("Error connecting to host {}.", host); - future.getChannel().close(); - - // if we were not able to connect to the host, it could be down. - onChannelConnectFailure(); - return; - } - logger.debug("Connected to server {}.", host); - // Now that we have connected successfully to the server, execute all queueing - // requests. - onChannelConnected(future); - } - - }); - } - - /** - * This is a helper method to do the connect attempt to the server given the - * inputted host/port. This can be used to connect to the default server - * host/port which is the VIP. That will pick a server in the cluster at - * random to connect to for the initial PubSub attempt (with redirect logic - * being done at the server side). Additionally, this could be called after - * the client makes an initial PubSub attempt at a server, and is redirected - * to the one that is responsible for the topic. Once the connect to the - * server is done, we will perform the corresponding PubSub write on that - * channel. - * - * @param serverHost - * Input server host to connect to of type InetSocketAddress - * @param pipelineFactory - * PipelineFactory to create response handler to handle responses from - * underlying channel. - */ - protected ChannelFuture connect(InetSocketAddress serverHost, - ClientChannelPipelineFactory pipelineFactory) { - logger.debug("Connecting to host {} ...", serverHost); - // Set up the ClientBootStrap so we can create a new Channel connection - // to the server. - ClientBootstrap bootstrap = new ClientBootstrap(channelManager.getChannelFactory()); - bootstrap.setPipelineFactory(pipelineFactory); - bootstrap.setOption("tcpNoDelay", true); - bootstrap.setOption("keepAlive", true); - - // Start the connection attempt to the input server host. - return bootstrap.connect(serverHost); - } - - @Override - public void close(boolean wait) { - synchronized (this) { - if (closed) { - return; - } - closed = true; - } - if (null == channel) { - return; - } - try { - getHChannelHandlerFromChannel(channel).closeExplicitly(); - } catch (NoResponseHandlerException nrhe) { - logger.warn("No channel handler found for channel {} when closing it.", - channel); - } - if (wait) { - channel.close().awaitUninterruptibly(); - } else { - channel.close(); - } - channel = null; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("[HChannel: host - ").append(host) - .append(", channel - ").append(channel) - .append(", pending reqs - ").append(pendingOps.size()) - .append(", closed - ").append(closed).append("]"); - return sb.toString(); - } - - @Override - public void close() { - close(false); - } - - /** - * Helper static method to get the ResponseHandler instance from a Channel - * via the ChannelPipeline it is associated with. The assumption is that the - * last ChannelHandler tied to the ChannelPipeline is the ResponseHandler. - * - * @param channel - * Channel we are retrieving the ResponseHandler instance for - * @return ResponseHandler Instance tied to the Channel's Pipeline - */ - public static HChannelHandler getHChannelHandlerFromChannel(Channel channel) - throws NoResponseHandlerException { - if (null == channel) { - throw new NoResponseHandlerException("Received a null value for the channel. Cannot retrieve the response handler"); - } - - HChannelHandler handler = (HChannelHandler) channel.getPipeline().getLast(); - if (null == handler) { - throw new NoResponseHandlerException("Could not retrieve the response handler from the channel's pipeline."); - } - return handler; - } - -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java deleted file mode 100644 index a91bbf8..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/NonSubscriptionChannelPipelineFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.handlers.AbstractResponseHandler; -import org.apache.hedwig.client.handlers.PublishResponseHandler; -import org.apache.hedwig.client.handlers.UnsubscribeResponseHandler; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; - -public class NonSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory { - - public NonSubscriptionChannelPipelineFactory(ClientConfiguration cfg, - AbstractHChannelManager channelManager) { - super(cfg, channelManager); - } - - @Override - protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() { - Map<OperationType, AbstractResponseHandler> handlers = - new HashMap<OperationType, AbstractResponseHandler>(); - handlers.put(OperationType.PUBLISH, - new PublishResponseHandler(cfg, channelManager)); - handlers.put(OperationType.UNSUBSCRIBE, - new UnsubscribeResponseHandler(cfg, channelManager)); - return handlers; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java deleted file mode 100644 index 17d2401..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.exceptions.PubSubException.ResubscribeException; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.util.Callback; -import static org.apache.hedwig.util.VarArgs.va; - -/** - * This class is used when a Subscribe channel gets disconnected and we attempt - * to resubmit subscribe requests existed in that channel. Once the resubscribe - * the topic is completed, we need to restart delivery for that topic. - */ -class ResubscribeCallback implements Callback<ResponseBody> { - - private static final Logger logger = LoggerFactory.getLogger(ResubscribeCallback.class); - - // Private member variables - private final TopicSubscriber origTopicSubscriber; - private final PubSubData origSubData; - private final AbstractHChannelManager channelManager; - private final long retryWaitTime; - - // Constructor - ResubscribeCallback(TopicSubscriber origTopicSubscriber, - PubSubData origSubData, - AbstractHChannelManager channelManager, - long retryWaitTime) { - this.origTopicSubscriber = origTopicSubscriber; - this.origSubData = origSubData; - this.channelManager = channelManager; - this.retryWaitTime = retryWaitTime; - } - - @Override - public void operationFinished(Object ctx, ResponseBody resultOfOperation) { - if (logger.isDebugEnabled()) - logger.debug("Resubscribe succeeded for origSubData: " + origSubData); - // Now we want to restart delivery for the subscription channel only - // if delivery was started at the time the original subscribe channel - // was disconnected. - try { - channelManager.restartDelivery(origTopicSubscriber); - } catch (ClientNotSubscribedException e) { - // This exception should never be thrown here but just in case, - // log an error and just keep retrying the subscribe request. - logger.error("Subscribe was successful but error starting delivery for {} : {}", - va(origTopicSubscriber, e.getMessage())); - retrySubscribeRequest(); - } catch (AlreadyStartDeliveryException asde) { - // should not reach here - } - } - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - if (exception instanceof ResubscribeException) { - // it might be caused by closesub when resubscribing. - // so we don't need to retry resubscribe again - logger.warn("Failed to resubscribe {} : but it is caused by closesub when resubscribing. " - + "so we don't need to retry subscribe again.", origSubData); - } - // If the resubscribe fails, just keep retrying the subscribe - // request. There isn't a way to flag to the application layer that - // a topic subscription has failed. So instead, we'll just keep - // retrying in the background until success. - logger.error("Resubscribe failed with error: " + exception.getMessage()); - // we don't retry subscribe request is channel manager is closing - // otherwise it might overflow the stack. - if (!channelManager.isClosed()) { - retrySubscribeRequest(); - } - } - - private void retrySubscribeRequest() { - if (channelManager.isClosed()) { - return; - } - origSubData.clearServersList(); - logger.debug("Resubmit subscribe request for {} in {} ms later.", - va(origTopicSubscriber, retryWaitTime)); - channelManager.submitOpAfterDelay(origSubData, retryWaitTime); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java deleted file mode 100644 index c7a71cb..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/WriteCallback.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl; - -import java.net.InetSocketAddress; -import java.util.LinkedList; - -import org.apache.hedwig.client.exceptions.NoResponseHandlerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.netty.HChannelManager; -import org.apache.hedwig.client.netty.NetUtils; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.util.HedwigSocketAddress; - -public class WriteCallback implements ChannelFutureListener { - - private static final Logger logger = LoggerFactory.getLogger(WriteCallback.class); - - // Private member variables - private PubSubData pubSubData; - private final HChannelManager channelManager; - - // Constructor - public WriteCallback(PubSubData pubSubData, - HChannelManager channelManager) { - super(); - this.pubSubData = pubSubData; - this.channelManager = channelManager; - } - - public void operationComplete(ChannelFuture future) throws Exception { - // If the client has stopped, there is no need to proceed - // with any callback logic here. - if (channelManager.isClosed()) { - future.getChannel().close(); - return; - } - - // When the write operation to the server is done, we just need to check - // if it was successful or not. - InetSocketAddress host = NetUtils.getHostFromChannel(future.getChannel()); - if (!future.isSuccess()) { - logger.error("Error writing on channel to host: {}", host); - // On a write failure for a PubSubRequest, we also want to remove - // the saved txnId to PubSubData in the ResponseHandler. These - // requests will not receive an ack response from the server - // so there is no point storing that information there anymore. - try { - HChannelHandler channelHandler = - HChannelImpl.getHChannelHandlerFromChannel(future.getChannel()); - channelHandler.removeTxn(pubSubData.txnId); - channelHandler.closeExplicitly(); - } catch (NoResponseHandlerException e) { - // We just couldn't remove the transaction ID's mapping. - // The handler was null, so this has been reset anyway. - logger.warn("Could not find response handler to remove txnId mapping to pubsub data. Ignoring."); - } - - future.getChannel().close(); - - // If we were not able to write on the channel to the server host, - // the host could have died or something is wrong with the channel - // connection where we can connect to the host, but not write to it. - ByteString hostString = (host == null) ? null : ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(host)); - if (pubSubData.writeFailedServers != null && pubSubData.writeFailedServers.contains(hostString)) { - // We've already tried to write to this server previously and - // failed, so invoke the operationFailed callback. - logger.error("Error writing to host more than once so just invoke the operationFailed callback!"); - pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException( - "Error while writing message to server: " + hostString)); - } else { - logger.debug("Try to send the PubSubRequest again to the default server host/VIP for pubSubData: {}", - pubSubData); - // Keep track of this current server that we failed to write to - // but retry the request on the default server host/VIP. - if (pubSubData.writeFailedServers == null) - pubSubData.writeFailedServers = new LinkedList<ByteString>(); - pubSubData.writeFailedServers.add(hostString); - channelManager.submitOpToDefaultServer(pubSubData); - } - } else { - // Now that the write to the server is done, we have to wait for it - // to respond. The ResponseHandler will take care of the ack - // response from the server before we can determine if the async - // PubSub call has really completed successfully or not. - logger.debug("Successfully wrote to host: {} for pubSubData: {}", host, pubSubData); - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java deleted file mode 100644 index caa0734..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java +++ /dev/null @@ -1,314 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl.multiplex; - -import java.net.InetSocketAddress; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.client.exceptions.NoResponseHandlerException; -import org.apache.hedwig.client.handlers.SubscribeResponseHandler; -import org.apache.hedwig.client.netty.CleanupChannelMap; -import org.apache.hedwig.client.netty.HChannel; -import org.apache.hedwig.client.netty.NetUtils; -import org.apache.hedwig.client.netty.impl.AbstractHChannelManager; -import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory; -import org.apache.hedwig.client.netty.impl.HChannelHandler; -import org.apache.hedwig.client.netty.impl.HChannelImpl; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.Either; - -import static org.apache.hedwig.util.VarArgs.va; - - -/** - * Multiplex HChannel Manager which establish a connection for multi subscriptions. - */ -public class MultiplexHChannelManager extends AbstractHChannelManager { - - static final Logger logger = LoggerFactory.getLogger(MultiplexHChannelManager.class); - - // Find which HChannel that a given TopicSubscriber used. - protected final CleanupChannelMap<InetSocketAddress> subscriptionChannels; - - // A index map for each topic subscriber is served by which subscription channel - protected final CleanupChannelMap<TopicSubscriber> sub2Channels; - - // Concurrent Map to store Message handler for each topic + sub id combination. - // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler - // user set when connection is recovered - protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler - = new ConcurrentHashMap<TopicSubscriber, MessageHandler>(); - - // PipelineFactory to create subscription netty channels to the appropriate server - private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory; - - public MultiplexHChannelManager(ClientConfiguration cfg, - ChannelFactory socketFactory) { - super(cfg, socketFactory); - subscriptionChannels = new CleanupChannelMap<InetSocketAddress>(); - sub2Channels = new CleanupChannelMap<TopicSubscriber>(); - subscriptionChannelPipelineFactory = - new MultiplexSubscriptionChannelPipelineFactory(cfg, this); - } - - @Override - protected ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() { - return subscriptionChannelPipelineFactory; - } - - @Override - protected HChannel createAndStoreSubscriptionChannel(Channel channel) { - // store the channel connected to target host for future usage - InetSocketAddress host = NetUtils.getHostFromChannel(channel); - HChannel newHChannel = new HChannelImpl(host, channel, this, - getSubscriptionChannelPipelineFactory()); - return storeSubscriptionChannel(host, newHChannel); - } - - @Override - protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) { - HChannel newHChannel = new HChannelImpl(host, this, - getSubscriptionChannelPipelineFactory()); - return storeSubscriptionChannel(host, newHChannel); - } - - private HChannel storeSubscriptionChannel(InetSocketAddress host, - HChannel newHChannel) { - // here, we guarantee there is only one channel used to communicate with target - // host. - return subscriptionChannels.addChannel(host, newHChannel); - } - - @Override - protected HChannel getSubscriptionChannel(InetSocketAddress host) { - return subscriptionChannels.getChannel(host); - } - - protected HChannel getSubscriptionChannel(TopicSubscriber subscriber) { - InetSocketAddress host = topic2Host.get(subscriber.getTopic()); - if (null == host) { - // we don't know where is the owner of the topic - return null; - } else { - return getSubscriptionChannel(host); - } - } - - @Override - protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber subscriber) { - InetSocketAddress host = topic2Host.get(subscriber.getTopic()); - if (null == host) { - // we don't know where is the topic - return null; - } else { - // we had know which server owned the topic - HChannel channel = getSubscriptionChannel(host); - if (null == channel) { - // create a channel to connect to sepcified host - channel = createAndStoreSubscriptionChannel(host); - } - return channel; - } - } - - @Override - protected void onSubscriptionChannelDisconnected(InetSocketAddress host, - Channel channel) { - HChannel hChannel = subscriptionChannels.getChannel(host); - if (null == hChannel) { - return; - } - Channel underlyingChannel = hChannel.getChannel(); - if (null == underlyingChannel || - !underlyingChannel.equals(channel)) { - return; - } - logger.info("Subscription Channel {} disconnected from {}.", - va(channel, host)); - // remove existed channel - if (subscriptionChannels.removeChannel(host, hChannel)) { - try { - HChannelHandler channelHandler = - HChannelImpl.getHChannelHandlerFromChannel(channel); - channelHandler.getSubscribeResponseHandler() - .onChannelDisconnected(host, channel); - } catch (NoResponseHandlerException nrhe) { - logger.warn("No Channel Handler found for channel {} when it disconnected.", - channel); - } - } - } - - @Override - public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) { - HChannel hChannel = getSubscriptionChannel(topicSubscriber); - if (null == hChannel) { - return null; - } - Channel channel = hChannel.getChannel(); - if (null == channel) { - return null; - } - try { - HChannelHandler channelHandler = - HChannelImpl.getHChannelHandlerFromChannel(channel); - return channelHandler.getSubscribeResponseHandler(); - } catch (NoResponseHandlerException nrhe) { - logger.warn("No Channel Handler found for channel {}, topic subscriber {}.", - channel, topicSubscriber); - return null; - } - } - - @Override - public void startDelivery(TopicSubscriber topicSubscriber, - MessageHandler messageHandler) - throws ClientNotSubscribedException, AlreadyStartDeliveryException { - startDelivery(topicSubscriber, messageHandler, false); - } - - @Override - protected void restartDelivery(TopicSubscriber topicSubscriber) - throws ClientNotSubscribedException, AlreadyStartDeliveryException { - startDelivery(topicSubscriber, null, true); - } - - private void startDelivery(TopicSubscriber topicSubscriber, - MessageHandler messageHandler, - boolean restart) - throws ClientNotSubscribedException, AlreadyStartDeliveryException { - // Make sure we know about this topic subscription on the client side - // exists. The assumption is that the client should have in memory the - // Channel created for the TopicSubscriber once the server has sent - // an ack response to the initial subscribe request. - SubscribeResponseHandler subscribeResponseHandler = - getSubscribeResponseHandler(topicSubscriber); - if (null == subscribeResponseHandler || - !subscribeResponseHandler.hasSubscription(topicSubscriber)) { - logger.error("Client is not yet subscribed to {}.", topicSubscriber); - throw new ClientNotSubscribedException("Client is not yet subscribed to " - + topicSubscriber); - } - - MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber); - if (restart) { - // restart using existing msg handler - messageHandler = existedMsgHandler; - } else { - // some has started delivery but not stop it - if (null != existedMsgHandler) { - throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber); - } - if (messageHandler != null) { - if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) { - throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber); - } - } - } - - // tell subscribe response handler to start delivering messages for topicSubscriber - subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler); - } - - public void stopDelivery(TopicSubscriber topicSubscriber) - throws ClientNotSubscribedException { - // Make sure we know that this topic subscription on the client side - // exists. The assumption is that the client should have in memory the - // Channel created for the TopicSubscriber once the server has sent - // an ack response to the initial subscribe request. - SubscribeResponseHandler subscribeResponseHandler = - getSubscribeResponseHandler(topicSubscriber); - if (null == subscribeResponseHandler || - !subscribeResponseHandler.hasSubscription(topicSubscriber)) { - logger.error("Client is not yet subscribed to {}.", topicSubscriber); - throw new ClientNotSubscribedException("Client is not yet subscribed to " - + topicSubscriber); - } - - // tell subscribe response handler to stop delivering messages for a given topic subscriber - topicSubscriber2MessageHandler.remove(topicSubscriber); - subscribeResponseHandler.stopDelivery(topicSubscriber); - } - - - @Override - public void asyncCloseSubscription(final TopicSubscriber topicSubscriber, - final Callback<ResponseBody> callback, - final Object context) { - SubscribeResponseHandler subscribeResponseHandler = - getSubscribeResponseHandler(topicSubscriber); - if (null == subscribeResponseHandler || - !subscribeResponseHandler.hasSubscription(topicSubscriber)) { - logger.warn("Trying to close a subscription when we don't have a subscription channel cached for {}", - topicSubscriber); - callback.operationFinished(context, (ResponseBody)null); - return; - } - subscribeResponseHandler.asyncCloseSubscription(topicSubscriber, callback, context); - } - - @Override - protected void checkTimeoutRequestsOnSubscriptionChannels() { - // timeout task may be started before constructing subscriptionChannels - if (null == subscriptionChannels) { - return; - } - for (HChannel channel : subscriptionChannels.getChannels()) { - try { - HChannelHandler channelHandler = - HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel()); - channelHandler.checkTimeoutRequests(); - } catch (NoResponseHandlerException nrhe) { - continue; - } - } - } - - @Override - protected void closeSubscriptionChannels() { - subscriptionChannels.close(); - } - - protected Either<Boolean, HChannel> storeSubscriptionChannel( - TopicSubscriber topicSubscriber, PubSubData txn, HChannel channel) { - boolean replaced = sub2Channels.replaceChannel( - topicSubscriber, txn.getOriginalChannelForResubscribe(), channel); - if (replaced) { - return Either.of(replaced, channel); - } else { - return Either.of(replaced, null); - } - } - - protected boolean removeSubscriptionChannel( - TopicSubscriber topicSubscriber, HChannel channel) { - return sub2Channels.removeChannel(topicSubscriber, channel); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java deleted file mode 100644 index 9b8ebe0..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl.multiplex; - -import java.net.InetSocketAddress; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.Channel; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.client.netty.HChannelManager; -import org.apache.hedwig.client.netty.HChannel; -import org.apache.hedwig.client.netty.NetUtils; -import org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler; -import org.apache.hedwig.client.netty.impl.ActiveSubscriber; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.Either; -import static org.apache.hedwig.util.VarArgs.va; - -public class MultiplexSubscribeResponseHandler extends AbstractSubscribeResponseHandler { - - private static final Logger logger = - LoggerFactory.getLogger(MultiplexSubscribeResponseHandler.class); - - // the underlying subscription channel - volatile HChannel hChannel; - private final MultiplexHChannelManager sChannelManager; - - protected MultiplexSubscribeResponseHandler(ClientConfiguration cfg, - HChannelManager channelManager) { - super(cfg, channelManager); - sChannelManager = (MultiplexHChannelManager) channelManager; - } - - @Override - public void handleResponse(PubSubResponse response, PubSubData pubSubData, - Channel channel) throws Exception { - if (null == hChannel) { - InetSocketAddress host = NetUtils.getHostFromChannel(channel); - hChannel = sChannelManager.getSubscriptionChannel(host); - if (null == hChannel || - !channel.equals(hChannel.getChannel())) { - PubSubException pse = - new UnexpectedConditionException("Failed to get subscription channel of " + host); - pubSubData.getCallback().operationFailed(pubSubData.context, pse); - return; - } - } - super.handleResponse(response, pubSubData, channel); - } - - @Override - protected Either<StatusCode, HChannel> handleSuccessResponse( - TopicSubscriber ts, PubSubData pubSubData, Channel channel) { - // Store the mapping for the TopicSubscriber to the Channel. - // This is so we can control the starting and stopping of - // message deliveries from the server on that Channel. Store - // this only on a successful ack response from the server. - Either<Boolean, HChannel> result = - sChannelManager.storeSubscriptionChannel(ts, pubSubData, hChannel); - if (result.left()) { - return Either.of(StatusCode.SUCCESS, result.right()); - } else { - StatusCode code; - if (pubSubData.isResubscribeRequest()) { - code = StatusCode.RESUBSCRIBE_EXCEPTION; - } else { - code = StatusCode.CLIENT_ALREADY_SUBSCRIBED; - } - return Either.of(code, null); - } - } - - @Override - public void asyncCloseSubscription(final TopicSubscriber topicSubscriber, - final Callback<ResponseBody> callback, - final Object context) { - final ActiveSubscriber ss = getActiveSubscriber(topicSubscriber); - if (null == ss || null == hChannel) { - logger.debug("No subscription {} found when closing its subscription from {}.", - va(topicSubscriber, hChannel)); - callback.operationFinished(context, (ResponseBody)null); - return; - } - Callback<ResponseBody> closeCb = new Callback<ResponseBody>() { - @Override - public void operationFinished(Object ctx, ResponseBody respBody) { - removeSubscription(topicSubscriber, ss); - sChannelManager.removeSubscriptionChannel(topicSubscriber, hChannel); - callback.operationFinished(context, null); - } - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - callback.operationFailed(context, exception); - } - }; - PubSubData closeOp = new PubSubData(topicSubscriber.getTopic(), null, - topicSubscriber.getSubscriberId(), - OperationType.CLOSESUBSCRIPTION, - null, closeCb, context); - hChannel.submitOp(closeOp); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java deleted file mode 100644 index c43108a..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscriptionChannelPipelineFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl.multiplex; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.handlers.AbstractResponseHandler; -import org.apache.hedwig.client.handlers.CloseSubscriptionResponseHandler; -import org.apache.hedwig.client.netty.impl.AbstractHChannelManager; -import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory; -import org.apache.hedwig.client.netty.impl.HChannelHandler; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; - -public class MultiplexSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory { - - public MultiplexSubscriptionChannelPipelineFactory(ClientConfiguration cfg, - MultiplexHChannelManager channelManager) { - super(cfg, channelManager); - } - - @Override - protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() { - Map<OperationType, AbstractResponseHandler> handlers = - new HashMap<OperationType, AbstractResponseHandler>(); - handlers.put(OperationType.SUBSCRIBE, - new MultiplexSubscribeResponseHandler(cfg, channelManager)); - handlers.put(OperationType.CLOSESUBSCRIPTION, - new CloseSubscriptionResponseHandler(cfg, channelManager)); - return handlers; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java deleted file mode 100644 index fb0a7d9..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java +++ /dev/null @@ -1,358 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl.simple; - -import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.client.exceptions.NoResponseHandlerException; -import org.apache.hedwig.client.handlers.SubscribeResponseHandler; -import org.apache.hedwig.client.netty.CleanupChannelMap; -import org.apache.hedwig.client.netty.HChannel; -import org.apache.hedwig.client.netty.NetUtils; -import org.apache.hedwig.client.netty.impl.AbstractHChannelManager; -import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory; -import org.apache.hedwig.client.netty.impl.HChannelHandler; -import org.apache.hedwig.client.netty.impl.HChannelImpl; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.exceptions.PubSubException.TopicBusyException; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.Either; -import static org.apache.hedwig.util.VarArgs.va; - -/** - * Simple HChannel Manager which establish a connection for each subscription. - */ -public class SimpleHChannelManager extends AbstractHChannelManager { - - private static final Logger logger = LoggerFactory.getLogger(SimpleHChannelManager.class); - - // Concurrent Map to store the cached Channel connections on the client side - // to a server host for a given Topic + SubscriberId combination. For each - // TopicSubscriber, we want a unique Channel connection to the server for - // it. We can also get the ResponseHandler tied to the Channel via the - // Channel Pipeline. - protected final CleanupChannelMap<TopicSubscriber> topicSubscriber2Channel; - - // Concurrent Map to store Message handler for each topic + sub id combination. - // Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler - // user set when connection is recovered - protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler - = new ConcurrentHashMap<TopicSubscriber, MessageHandler>(); - - // PipelineFactory to create subscription netty channels to the appropriate server - private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory; - - public SimpleHChannelManager(ClientConfiguration cfg, - ChannelFactory socketFactory) { - super(cfg, socketFactory); - topicSubscriber2Channel = new CleanupChannelMap<TopicSubscriber>(); - this.subscriptionChannelPipelineFactory = - new SimpleSubscriptionChannelPipelineFactory(cfg, this); - } - - @Override - public void submitOp(final PubSubData pubSubData) { - /** - * In the simple hchannel implementation that if a client closes a subscription - * and tries to attach to it immediately, it could get a TOPIC_BUSY response. This - * is because, a subscription is closed simply by closing the channel, and the hub - * side may not have been notified of the channel disconnection event by the time - * the new subscription request comes in. To solve this, retry up to 5 times. - * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-513} - */ - if (OperationType.SUBSCRIBE.equals(pubSubData.operationType)) { - final Callback<ResponseBody> origCb = pubSubData.getCallback(); - final AtomicInteger retries = new AtomicInteger(5); - final Callback<ResponseBody> wrapperCb - = new Callback<ResponseBody>() { - @Override - public void operationFinished(Object ctx, - ResponseBody resultOfOperation) { - origCb.operationFinished(ctx, resultOfOperation); - } - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - if (exception instanceof ServiceDownException - && exception.getCause() instanceof TopicBusyException - && retries.decrementAndGet() > 0) { - logger.warn("TOPIC_DOWN from server using simple channel scheme." - + "This could be due to the channel disconnection from a close" - + " not having been triggered on the server side. Retrying"); - SimpleHChannelManager.super.submitOp(pubSubData); - return; - } - origCb.operationFailed(ctx, exception); - } - }; - pubSubData.setCallback(wrapperCb); - } - super.submitOp(pubSubData); - } - - @Override - protected ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() { - return subscriptionChannelPipelineFactory; - } - - @Override - protected HChannel createAndStoreSubscriptionChannel(Channel channel) { - // for simple channel, we don't store subscription channel now - // we store it until we received success response - InetSocketAddress host = NetUtils.getHostFromChannel(channel); - return new HChannelImpl(host, channel, this, - getSubscriptionChannelPipelineFactory()); - } - - @Override - protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) { - // for simple channel, we don't store subscription channel now - // we store it until we received success response - return new HChannelImpl(host, this, - getSubscriptionChannelPipelineFactory()); - } - - protected Either<Boolean, HChannel> storeSubscriptionChannel( - TopicSubscriber topicSubscriber, PubSubData txn, Channel channel) { - InetSocketAddress host = NetUtils.getHostFromChannel(channel); - HChannel newHChannel = new HChannelImpl(host, channel, this, - getSubscriptionChannelPipelineFactory()); - boolean replaced = topicSubscriber2Channel.replaceChannel( - topicSubscriber, txn.getOriginalChannelForResubscribe(), newHChannel); - if (replaced) { - return Either.of(replaced, newHChannel); - } else { - return Either.of(replaced, null); - } - } - - @Override - protected HChannel getSubscriptionChannel(InetSocketAddress host) { - return null; - } - - @Override - protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber subscriber) { - HChannel channel = topicSubscriber2Channel.getChannel(subscriber); - if (null != channel) { - // there is no channel established for this subscription - return channel; - } else { - InetSocketAddress host = topic2Host.get(subscriber.getTopic()); - if (null == host) { - return null; - } else { - channel = getSubscriptionChannel(host); - if (null == channel) { - channel = createAndStoreSubscriptionChannel(host); - } - return channel; - } - } - } - - @Override - protected void onSubscriptionChannelDisconnected(InetSocketAddress host, - Channel channel) { - logger.info("Subscription Channel {} disconnected from {}.", - va(channel, host)); - try { - // get hchannel handler - HChannelHandler channelHandler = - HChannelImpl.getHChannelHandlerFromChannel(channel); - channelHandler.getSubscribeResponseHandler() - .onChannelDisconnected(host, channel); - } catch (NoResponseHandlerException nrhe) { - logger.warn("No Channel Handler found for channel {} when it disconnected.", - channel); - } - } - - @Override - public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) { - HChannel hChannel = topicSubscriber2Channel.getChannel(topicSubscriber); - if (null == hChannel) { - return null; - } - Channel channel = hChannel.getChannel(); - if (null == channel) { - return null; - } - try { - HChannelHandler channelHandler = - HChannelImpl.getHChannelHandlerFromChannel(channel); - return channelHandler.getSubscribeResponseHandler(); - } catch (NoResponseHandlerException nrhe) { - logger.warn("No Channel Handler found for channel {}, topic subscriber {}.", - channel, topicSubscriber); - return null; - } - - } - - @Override - public void startDelivery(TopicSubscriber topicSubscriber, - MessageHandler messageHandler) - throws ClientNotSubscribedException, AlreadyStartDeliveryException { - startDelivery(topicSubscriber, messageHandler, false); - } - - @Override - protected void restartDelivery(TopicSubscriber topicSubscriber) - throws ClientNotSubscribedException, AlreadyStartDeliveryException { - startDelivery(topicSubscriber, null, true); - } - - private void startDelivery(TopicSubscriber topicSubscriber, - MessageHandler messageHandler, boolean restart) - throws ClientNotSubscribedException, AlreadyStartDeliveryException { - // Make sure we know about this topic subscription on the client side - // exists. The assumption is that the client should have in memory the - // Channel created for the TopicSubscriber once the server has sent - // an ack response to the initial subscribe request. - SubscribeResponseHandler subscribeResponseHandler = - getSubscribeResponseHandler(topicSubscriber); - if (null == subscribeResponseHandler || - !subscribeResponseHandler.hasSubscription(topicSubscriber)) { - logger.error("Client is not yet subscribed to {}.", topicSubscriber); - throw new ClientNotSubscribedException("Client is not yet subscribed to " - + topicSubscriber); - } - - MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber); - if (restart) { - // restart using existing msg handler - messageHandler = existedMsgHandler; - } else { - // some has started delivery but not stop it - if (null != existedMsgHandler) { - throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber); - } - if (messageHandler != null) { - if (null != topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) { - throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber); - } - } - } - - // tell subscribe response handler to start delivering messages for topicSubscriber - subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler); - } - - public void stopDelivery(TopicSubscriber topicSubscriber) - throws ClientNotSubscribedException { - // Make sure we know that this topic subscription on the client side - // exists. The assumption is that the client should have in memory the - // Channel created for the TopicSubscriber once the server has sent - // an ack response to the initial subscribe request. - SubscribeResponseHandler subscribeResponseHandler = - getSubscribeResponseHandler(topicSubscriber); - if (null == subscribeResponseHandler || - !subscribeResponseHandler.hasSubscription(topicSubscriber)) { - logger.error("Client is not yet subscribed to {}.", topicSubscriber); - throw new ClientNotSubscribedException("Client is not yet subscribed to " - + topicSubscriber); - } - - // tell subscribe response handler to stop delivering messages for a given topic subscriber - topicSubscriber2MessageHandler.remove(topicSubscriber); - subscribeResponseHandler.stopDelivery(topicSubscriber); - } - - - @Override - public void asyncCloseSubscription(final TopicSubscriber topicSubscriber, - final Callback<ResponseBody> callback, - final Object context) { - HChannel hChannel = topicSubscriber2Channel.removeChannel(topicSubscriber); - if (null == hChannel) { - logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for {}", - topicSubscriber); - callback.operationFinished(context, (ResponseBody)null); - return; - } - - Channel channel = hChannel.getChannel(); - if (null == channel) { - callback.operationFinished(context, (ResponseBody)null); - return; - } - - try { - HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly(); - } catch (NoResponseHandlerException nrhe) { - logger.warn("No Channel Handler found when closing {}'s channel {}.", - channel, topicSubscriber); - } - ChannelFuture future = channel.close(); - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - logger.error("Failed to close the subscription channel for {}", - topicSubscriber); - callback.operationFailed(context, new ServiceDownException( - "Failed to close the subscription channel for " + topicSubscriber)); - } else { - callback.operationFinished(context, (ResponseBody)null); - } - } - }); - } - - @Override - protected void checkTimeoutRequestsOnSubscriptionChannels() { - // timeout task may be started before constructing topicSubscriber2Channel - if (null == topicSubscriber2Channel) { - return; - } - for (HChannel channel : topicSubscriber2Channel.getChannels()) { - try { - HChannelHandler channelHandler = - HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel()); - channelHandler.checkTimeoutRequests(); - } catch (NoResponseHandlerException nrhe) { - continue; - } - } - } - - @Override - protected void closeSubscriptionChannels() { - topicSubscriber2Channel.close(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java deleted file mode 100644 index ee5bd90..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java +++ /dev/null @@ -1,273 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl.simple; - - -import java.util.Set; -import java.util.Collections; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; - - -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.client.netty.HChannel; -import org.apache.hedwig.client.netty.HChannelManager; -import org.apache.hedwig.client.netty.impl.AbstractHChannelManager; -import org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler; -import org.apache.hedwig.client.netty.impl.ActiveSubscriber; -import org.apache.hedwig.client.netty.impl.HChannelImpl; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; -import org.apache.hedwig.protoextensions.MessageIdUtils; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.Either; - -public class SimpleSubscribeResponseHandler extends AbstractSubscribeResponseHandler { - - private static final Logger logger = LoggerFactory.getLogger(SimpleSubscribeResponseHandler.class); - - /** - * Simple Active Subscriber enabling client-side throttling. - */ - static class SimpleActiveSubscriber extends ActiveSubscriber { - - // Set to store all of the outstanding subscribed messages that are pending - // to be consumed by the client app's MessageHandler. If this ever grows too - // big (e.g. problem at the client end for message consumption), we can - // throttle things by temporarily setting the Subscribe Netty Channel - // to not be readable. When the Set has shrunk sufficiently, we can turn the - // channel back on to read new messages. - private final Set<Message> outstandingMsgSet; - - public SimpleActiveSubscriber(ClientConfiguration cfg, - AbstractHChannelManager channelManager, - TopicSubscriber ts, PubSubData op, - SubscriptionPreferences preferences, - Channel channel, - HChannel hChannel) { - super(cfg, channelManager, ts, op, preferences, channel, hChannel); - outstandingMsgSet = Collections.newSetFromMap( - new ConcurrentHashMap<Message, Boolean>( - cfg.getMaximumOutstandingMessages(), 1.0f)); - } - - @Override - protected void unsafeDeliverMessage(Message message) { - // Add this "pending to be consumed" message to the outstandingMsgSet. - outstandingMsgSet.add(message); - // Check if we've exceeded the max size for the outstanding message set. - if (outstandingMsgSet.size() >= cfg.getMaximumOutstandingMessages() && - channel.isReadable()) { - // Too many outstanding messages so throttle it by setting the Netty - // Channel to not be readable. - if (logger.isDebugEnabled()) { - logger.debug("Too many outstanding messages ({}) so throttling the subscribe netty Channel", - outstandingMsgSet.size()); - } - channel.setReadable(false); - } - super.unsafeDeliverMessage(message); - } - - @Override - public synchronized void messageConsumed(Message message) { - super.messageConsumed(message); - // Remove this consumed message from the outstanding Message Set. - outstandingMsgSet.remove(message); - // Check if we throttled message consumption previously when the - // outstanding message limit was reached. For now, only turn the - // delivery back on if there are no more outstanding messages to - // consume. We could make this a configurable parameter if needed. - if (!channel.isReadable() && outstandingMsgSet.size() == 0) { - if (logger.isDebugEnabled()) - logger.debug("Message consumption has caught up so okay to turn off" - + " throttling of messages on the subscribe channel for {}", - topicSubscriber); - channel.setReadable(true); - } - } - - @Override - public synchronized void startDelivery(MessageHandler messageHandler) - throws AlreadyStartDeliveryException, ClientNotSubscribedException { - super.startDelivery(messageHandler); - // Now make the TopicSubscriber Channel readable (it is set to not be - // readable when the initial subscription is done). Note that this is an - // asynchronous call. If this fails (not likely), the futureListener - // will just log an error message for now. - ChannelFuture future = channel.setReadable(true); - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - logger.error("Unable to make subscriber Channel readable in startDelivery call for {}", - topicSubscriber); - } - } - }); - } - - @Override - public synchronized void stopDelivery() { - super.stopDelivery(); - // Now make the TopicSubscriber channel not-readable. This will buffer - // up messages if any are sent from the server. Note that this is an - // asynchronous call. If this fails (not likely), the futureListener - // will just log an error message for now. - ChannelFuture future = channel.setReadable(false); - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - logger.error("Unable to make subscriber Channel not readable in stopDelivery call for {}", - topicSubscriber); - } - } - }); - } - - } - - // Track which subscriber is alive in this response handler - // Which is used for backward compat, since old version hub - // server doesn't carry (topic, subscriberid) in each message. - private volatile TopicSubscriber origTopicSubscriber; - private volatile ActiveSubscriber origActiveSubscriber; - - private SimpleHChannelManager sChannelManager; - - protected SimpleSubscribeResponseHandler(ClientConfiguration cfg, - HChannelManager channelManager) { - super(cfg, channelManager); - sChannelManager = (SimpleHChannelManager) channelManager; - } - - @Override - protected ActiveSubscriber createActiveSubscriber( - ClientConfiguration cfg, AbstractHChannelManager channelManager, - TopicSubscriber ts, PubSubData op, SubscriptionPreferences preferences, - Channel channel, HChannel hChannel) { - return new SimpleActiveSubscriber(cfg, channelManager, ts, op, preferences, channel, hChannel); - } - - @Override - protected synchronized ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) { - if (null == origTopicSubscriber || !origTopicSubscriber.equals(ts)) { - return null; - } - return origActiveSubscriber; - } - - private synchronized ActiveSubscriber getActiveSubscriber() { - return origActiveSubscriber; - } - - @Override - public synchronized boolean hasSubscription(TopicSubscriber ts) { - if (null == origTopicSubscriber) { - return false; - } - return origTopicSubscriber.equals(ts); - } - - @Override - protected synchronized boolean removeSubscription(TopicSubscriber ts, ActiveSubscriber ss) { - if (null != origTopicSubscriber && !origTopicSubscriber.equals(ts)) { - return false; - } - origTopicSubscriber = null; - origActiveSubscriber = null; - return super.removeSubscription(ts, ss); - } - - @Override - public void handleResponse(PubSubResponse response, PubSubData pubSubData, - Channel channel) throws Exception { - // If this was not a successful response to the Subscribe request, we - // won't be using the Netty Channel created so just close it. - if (!response.getStatusCode().equals(StatusCode.SUCCESS)) { - HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly(); - channel.close(); - } - super.handleResponse(response, pubSubData, channel); - } - - @Override - public void handleSubscribeMessage(PubSubResponse response) { - Message message = response.getMessage(); - ActiveSubscriber ss = getActiveSubscriber(); - if (null == ss) { - logger.error("No Subscriber is alive receiving its message {}.", - MessageIdUtils.msgIdToReadableString(message.getMsgId())); - return; - } - ss.handleMessage(message); - } - - @Override - protected Either<StatusCode, HChannel> handleSuccessResponse( - TopicSubscriber ts, PubSubData pubSubData, Channel channel) { - // Store the mapping for the TopicSubscriber to the Channel. - // This is so we can control the starting and stopping of - // message deliveries from the server on that Channel. Store - // this only on a successful ack response from the server. - Either<Boolean, HChannel> result = - sChannelManager.storeSubscriptionChannel(ts, pubSubData, channel); - if (result.left()) { - return Either.of(StatusCode.SUCCESS, result.right()); - } else { - StatusCode code; - if (pubSubData.isResubscribeRequest()) { - code = StatusCode.RESUBSCRIBE_EXCEPTION; - } else { - code = StatusCode.CLIENT_ALREADY_SUBSCRIBED; - } - return Either.of(code, null); - } - } - - @Override - protected synchronized void postHandleSuccessResponse( - TopicSubscriber ts, ActiveSubscriber as) { - origTopicSubscriber = ts; - origActiveSubscriber = as; - } - - @Override - public void asyncCloseSubscription(final TopicSubscriber topicSubscriber, - final Callback<ResponseBody> callback, - final Object context) { - // nothing to do just clear status - // channel manager takes the responsibility to close the channel - callback.operationFinished(context, (ResponseBody)null); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java deleted file mode 100644 index d14f053..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty.impl.simple; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.handlers.AbstractResponseHandler; -import org.apache.hedwig.client.handlers.CloseSubscriptionResponseHandler; -import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; - -public class SimpleSubscriptionChannelPipelineFactory extends ClientChannelPipelineFactory { - - public SimpleSubscriptionChannelPipelineFactory(ClientConfiguration cfg, - SimpleHChannelManager channelManager) { - super(cfg, channelManager); - } - - @Override - protected Map<OperationType, AbstractResponseHandler> createResponseHandlers() { - Map<OperationType, AbstractResponseHandler> handlers = - new HashMap<OperationType, AbstractResponseHandler>(); - handlers.put(OperationType.SUBSCRIBE, - new SimpleSubscribeResponseHandler(cfg, channelManager)); - handlers.put(OperationType.CLOSESUBSCRIPTION, - new CloseSubscriptionResponseHandler(cfg, channelManager)); - return handlers; - } - -}