Removing non-blocking server, using only threaded thrift servers now.
Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/5853d86e Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/5853d86e Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/5853d86e Branch: refs/heads/v2_command Commit: 5853d86e9d7fe3792693c140ba4ac961d438216d Parents: 5c0cfd5 Author: Aaron McCurry <amccu...@gmail.com> Authored: Mon Dec 7 15:09:31 2015 -0500 Committer: Aaron McCurry <amccu...@gmail.com> Committed: Mon Dec 7 15:09:31 2015 -0500 ---------------------------------------------------------------------- .../apache/blur/server/BlurServerContext.java | 9 +- .../blur/thrift/ThriftBlurControllerServer.java | 5 +- .../blur/thrift/ThriftBlurShardServer.java | 2 +- .../org/apache/blur/thrift/ThriftServer.java | 64 +- .../server/AbstractNonblockingServer.java | 614 ----------------- .../apache/blur/thrift/server/Invocation.java | 36 - .../thrift/server/TThreadedSelectorServer.java | 659 ------------------- .../apache/blur/thrift/server/ThriftTrace.java | 23 - .../apache/blur/thrift/server/ThriftTracer.java | 37 -- 9 files changed, 28 insertions(+), 1421 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-core/src/main/java/org/apache/blur/server/BlurServerContext.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/server/BlurServerContext.java b/blur-core/src/main/java/org/apache/blur/server/BlurServerContext.java index 542f057..3290f90 100644 --- a/blur-core/src/main/java/org/apache/blur/server/BlurServerContext.java +++ b/blur-core/src/main/java/org/apache/blur/server/BlurServerContext.java @@ -22,10 +22,8 @@ import org.apache.blur.log.Log; import org.apache.blur.log.LogFactory; import org.apache.blur.thirdparty.thrift_0_9_0.server.ServerContext; import org.apache.blur.thrift.generated.User; -import org.apache.blur.thrift.server.ThriftTrace; -import org.apache.blur.thrift.server.ThriftTracer; -public class BlurServerContext implements ServerContext, ThriftTrace { +public class BlurServerContext implements ServerContext { private static final Log LOG = LogFactory.getLog(BlurServerContext.class); @@ -94,9 +92,4 @@ public class BlurServerContext implements ServerContext, ThriftTrace { _traceRequestId = null; } - @Override - public ThriftTracer getTracer(String name) { - return ThriftTracer.NOTHING; - } - } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java index a1c0f88..a6312bd 100644 --- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java +++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java @@ -118,7 +118,7 @@ public class ThriftBlurControllerServer extends ThriftServer { if (configBindPort == 0) { instanceBindPort = 0; } - TServerTransport serverTransport = ThriftServer.getTServerTransport(bindAddress, instanceBindPort, configuration); + TServerTransport serverTransport = ThriftServer.getTServerTransport(bindAddress, instanceBindPort); instanceBindPort = ThriftServer.getBindingPort(serverTransport); LOG.info("Controller Server using index [{0}] bind address [{1}]", serverIndex, bindAddress + ":" @@ -187,7 +187,8 @@ public class ThriftBlurControllerServer extends ThriftServer { Trace.setStorage(traceStorage); Trace.setNodeName(nodeName); - List<ServerSecurityFilter> serverSecurity = getServerSecurityList(configuration, ServerSecurityFilterFactory.ServerType.CONTROLLER); + List<ServerSecurityFilter> serverSecurity = getServerSecurityList(configuration, + ServerSecurityFilterFactory.ServerType.CONTROLLER); Iface iface = BlurUtil.wrapFilteredBlurServer(configuration, controllerServer, false); iface = ServerSecurityUtil.applySecurity(iface, serverSecurity, false); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java index b7f969c..47772ff 100644 --- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java +++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java @@ -159,7 +159,7 @@ public class ThriftBlurShardServer extends ThriftServer { if (configBindPort == 0) { instanceBindPort = 0; } - TServerTransport serverTransport = ThriftServer.getTServerTransport(bindAddress, instanceBindPort, configuration); + TServerTransport serverTransport = ThriftServer.getTServerTransport(bindAddress, instanceBindPort); instanceBindPort = ThriftServer.getBindingPort(serverTransport); Set<Entry<String, String>> set = configuration.getProperties().entrySet(); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java index 51451fe..e69f2be 100644 --- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java +++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java @@ -60,22 +60,19 @@ import org.apache.blur.server.ServerSecurityFilter; import org.apache.blur.server.ServerSecurityFilterFactory; import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TBinaryProtocol; import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TCompactProtocol; +import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolFactory; import org.apache.blur.thirdparty.thrift_0_9_0.server.TServer; import org.apache.blur.thirdparty.thrift_0_9_0.server.TServerEventHandler; import org.apache.blur.thirdparty.thrift_0_9_0.server.TThreadPoolServer; import org.apache.blur.thirdparty.thrift_0_9_0.server.TThreadPoolServer.Args; import org.apache.blur.thirdparty.thrift_0_9_0.transport.TFramedTransport; -import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerSocket; -import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerTransport; import org.apache.blur.thirdparty.thrift_0_9_0.transport.TServerSocket; import org.apache.blur.thirdparty.thrift_0_9_0.transport.TServerTransport; import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException; +import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportFactory; import org.apache.blur.thrift.generated.Blur; import org.apache.blur.thrift.generated.Blur.Iface; import org.apache.blur.thrift.sasl.SaslHelper; -import org.apache.blur.thrift.sasl.TSaslServerTransport; -import org.apache.blur.thrift.server.TThreadedSelectorServer; -import org.apache.blur.thrift.server.TThreadedSelectorServer.Args.AcceptPolicy; import org.apache.blur.trace.LogTraceStorage; import org.apache.blur.trace.TraceStorage; import org.apache.blur.trace.hdfs.HdfsTraceStorage; @@ -302,34 +299,27 @@ public class ThriftServer { public void start() throws TTransportException, IOException { Blur.Processor<Blur.Iface> processor = new Blur.Processor<Blur.Iface>(_iface); + _executorService = Executors.newThreadPool("thrift-processors", _threadCount, false); + TProtocolFactory protocolFactory; + TTransportFactory transportFactory; + if (SaslHelper.isSaslEnabled(_configuration)) { - _executorService = Executors.newThreadPool("thrift-processors", _threadCount, false); - TSaslServerTransport.Factory saslTransportFactory = SaslHelper.getTSaslServerTransportFactory(_configuration); - Args args = new TThreadPoolServer.Args(_serverTransport); - args.executorService(_executorService); - args.processor(processor); - args.protocolFactory(new TCompactProtocol.Factory()); - args.transportFactory(saslTransportFactory); - - _server = new TThreadPoolServer(args); - _server.setServerEventHandler(_eventHandler); + transportFactory = SaslHelper.getTSaslServerTransportFactory(_configuration); + protocolFactory = new TCompactProtocol.Factory(); } else { - _executorService = Executors.newThreadPool("thrift-processors", _threadCount); - TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args( - (TNonblockingServerTransport) _serverTransport); - args.processor(processor); - args.executorService(_executorService); - args.transportFactory(new TFramedTransport.Factory(_maxFrameSize)); - args.protocolFactory(new TBinaryProtocol.Factory(true, true)); - args.selectorThreads = _selectorThreads; - args.maxReadBufferBytes = _maxReadBufferBytes; - args.acceptQueueSizePerThread(_acceptQueueSizePerThread); - args.acceptPolicy(AcceptPolicy.FAIR_ACCEPT); - - _server = new TThreadedSelectorServer(args); - _server.setServerEventHandler(_eventHandler); + transportFactory = new TFramedTransport.Factory(_maxFrameSize); + protocolFactory = new TBinaryProtocol.Factory(true, true); } + Args args = new TThreadPoolServer.Args(_serverTransport); + args.executorService(_executorService); + args.processor(processor); + args.protocolFactory(protocolFactory); + args.transportFactory(transportFactory); + + _server = new TThreadPoolServer(args); + _server.setServerEventHandler(_eventHandler); + LOG.info("Starting server [{0}]", _nodeName); _server.serve(); } @@ -437,21 +427,13 @@ public class ThriftServer { this._configuration = configuration; } - public static TServerTransport getTServerTransport(String bindAddress, int bindPort, BlurConfiguration configuration) - throws TTransportException { + public static TServerTransport getTServerTransport(String bindAddress, int bindPort) throws TTransportException { InetSocketAddress bindInetSocketAddress = getBindInetSocketAddress(bindAddress, bindPort); - if (SaslHelper.isSaslEnabled(configuration)) { - return new TServerSocket(bindInetSocketAddress); - } else { - return new TNonblockingServerSocket(bindInetSocketAddress); - } + return new TServerSocket(bindInetSocketAddress); } public static int getBindingPort(TServerTransport serverTransport) { - if (serverTransport instanceof TNonblockingServerSocket) { - TNonblockingServerSocket nonblockingServerSocket = (TNonblockingServerSocket) serverTransport; - return nonblockingServerSocket.getServerSocket().getLocalPort(); - } else if (serverTransport instanceof TServerSocket) { + if (serverTransport instanceof TServerSocket) { TServerSocket serverSocket = (TServerSocket) serverTransport; return serverSocket.getServerSocket().getLocalPort(); } else { @@ -499,7 +481,7 @@ public class ThriftServer { int sessionTimeout = conf.getInt(BLUR_ZOOKEEPER_TIMEOUT, BLUR_ZOOKEEPER_TIMEOUT_DEFAULT); int slash = zkConnectionStr.indexOf('/'); - if ((slash != -1) && (slash != zkConnectionStr.length()-1)) { + if ((slash != -1) && (slash != zkConnectionStr.length() - 1)) { ZooKeeper rootZk = ZkUtils.newZooKeeper(zkConnectionStr.substring(0, slash), sessionTimeout); String rootPath = zkConnectionStr.substring(slash, zkConnectionStr.length()); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java ---------------------------------------------------------------------- diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java deleted file mode 100644 index 667c2a4..0000000 --- a/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java +++ /dev/null @@ -1,614 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.blur.thrift.server; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.SelectorProvider; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.blur.thirdparty.thrift_0_9_0.TByteArrayOutputStream; -import org.apache.blur.thirdparty.thrift_0_9_0.TException; -import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol; -import org.apache.blur.thirdparty.thrift_0_9_0.server.ServerContext; -import org.apache.blur.thirdparty.thrift_0_9_0.server.TServer; -import org.apache.blur.thirdparty.thrift_0_9_0.transport.TFramedTransport; -import org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport; -import org.apache.blur.thirdparty.thrift_0_9_0.transport.TMemoryInputTransport; -import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerTransport; -import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingTransport; -import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransport; -import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provides common methods and classes used by nonblocking TServer - * implementations. - */ -public abstract class AbstractNonblockingServer extends TServer { - protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); - - public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends - AbstractServerArgs<T> { - public long maxReadBufferBytes = Long.MAX_VALUE; - - public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) { - super(transport); - transportFactory(new TFramedTransport.Factory()); - } - } - - /** - * The maximum amount of memory we will allocate to client IO buffers at a - * time. Without this limit, the server will gladly allocate client buffers - * right into an out of memory exception, rather than waiting. - */ - private final long MAX_READ_BUFFER_BYTES; - - /** - * How many bytes are currently allocated to read buffers. - */ - private final AtomicLong readBufferBytesAllocated = new AtomicLong(0); - - public AbstractNonblockingServer(AbstractNonblockingServerArgs args) { - super(args); - MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes; - } - - /** - * Begin accepting connections and processing invocations. - */ - public void serve() { - // start any IO threads - if (!startThreads()) { - return; - } - - // start listening, or exit - if (!startListening()) { - return; - } - - setServing(true); - - // this will block while we serve - waitForShutdown(); - - setServing(false); - - // do a little cleanup - stopListening(); - } - - /** - * Starts any threads required for serving. - * - * @return true if everything went ok, false if threads could not be started. - */ - protected abstract boolean startThreads(); - - /** - * A method that will block until when threads handling the serving have been - * shut down. - */ - protected abstract void waitForShutdown(); - - /** - * Have the server transport start accepting connections. - * - * @return true if we started listening successfully, false if something went - * wrong. - */ - protected boolean startListening() { - try { - serverTransport_.listen(); - return true; - } catch (TTransportException ttx) { - LOGGER.error("Failed to start listening on server socket!", ttx); - return false; - } - } - - /** - * Stop listening for connections. - */ - protected void stopListening() { - serverTransport_.close(); - } - - /** - * Perform an invocation. This method could behave several different ways - - * invoke immediately inline, queue for separate execution, etc. - * - * @return true if invocation was successfully requested, which is not a - * guarantee that invocation has completed. False if the request - * failed. - */ - protected abstract boolean requestInvoke(FrameBuffer frameBuffer); - - /** - * An abstract thread that handles selecting on a set of transports and - * {@link FrameBuffer FrameBuffers} associated with selected keys - * corresponding to requests. - */ - protected abstract class AbstractSelectThread extends Thread { - protected final Selector selector; - - // List of FrameBuffers that want to change their selection interests. - protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>(); - - public AbstractSelectThread() throws IOException { - this.selector = SelectorProvider.provider().openSelector(); - } - - /** - * If the selector is blocked, wake it up. - */ - public void wakeupSelector() { - selector.wakeup(); - } - - /** - * Add FrameBuffer to the list of select interest changes and wake up the - * selector if it's blocked. When the select() call exits, it'll give the - * FrameBuffer a chance to change its interests. - */ - public void requestSelectInterestChange(FrameBuffer frameBuffer) { - synchronized (selectInterestChanges) { - selectInterestChanges.add(frameBuffer); - } - // wakeup the selector, if it's currently blocked. - selector.wakeup(); - } - - /** - * Check to see if there are any FrameBuffers that have switched their - * interest type from read to write or vice versa. - */ - protected void processInterestChanges() { - synchronized (selectInterestChanges) { - for (FrameBuffer fb : selectInterestChanges) { - fb.changeSelectInterests(); - } - selectInterestChanges.clear(); - } - } - - /** - * Do the work required to read from a readable client. If the frame is - * fully read, then invoke the method call. - */ - protected void handleRead(SelectionKey key) { - FrameBuffer buffer = (FrameBuffer) key.attachment(); - ThriftTracer readTracer = ThriftTracer.NOTHING; - if (buffer.context_ != null) { - ServerContext context = buffer.context_; - if (context instanceof ThriftTrace) { - ThriftTrace thriftTrace = (ThriftTrace) context; - readTracer = thriftTrace.getTracer("thrift - handle read"); - } - } - readTracer.start(); - try { - if (!buffer.read()) { - cleanupSelectionKey(key); - return; - } - } finally { - readTracer.end(); - } - - // if the buffer's frame read is complete, invoke the method. - if (buffer.isFrameFullyRead()) { - ThriftTracer processTracer = ThriftTracer.NOTHING; - if (buffer.context_ != null) { - ServerContext context = buffer.context_; - if (context instanceof ThriftTrace) { - ThriftTrace thriftTrace = (ThriftTrace) context; - processTracer = thriftTrace.getTracer("thrift - handle request"); - } - } - processTracer.start(); - try { - if (!requestInvoke(buffer)) { - cleanupSelectionKey(key); - } - } finally { - processTracer.end(); - } - } - - } - - /** - * Let a writable client get written, if there's data to be written. - */ - protected void handleWrite(SelectionKey key) { - FrameBuffer buffer = (FrameBuffer) key.attachment(); - ThriftTracer writeTracer = ThriftTracer.NOTHING; - if (buffer.context_ != null) { - ServerContext context = buffer.context_; - if (context instanceof ThriftTrace) { - ThriftTrace thriftTrace = (ThriftTrace) context; - writeTracer = thriftTrace.getTracer("thrift - handle write"); - } - } - writeTracer.start(); - try { - if (!buffer.write()) { - cleanupSelectionKey(key); - } - } finally { - writeTracer.end(); - } - } - - /** - * Do connection-close cleanup on a given SelectionKey. - */ - protected void cleanupSelectionKey(SelectionKey key) { - // remove the records from the two maps - FrameBuffer buffer = (FrameBuffer) key.attachment(); - if (buffer != null) { - // close the buffer - buffer.close(); - } - // cancel the selection key - key.cancel(); - } - } // SelectThread - - /** - * Possible states for the FrameBuffer state machine. - */ - private enum FrameBufferState { - // in the midst of reading the frame size off the wire - READING_FRAME_SIZE, - // reading the actual frame data now, but not all the way done yet - READING_FRAME, - // completely read the frame, so an invocation can now happen - READ_FRAME_COMPLETE, - // waiting to get switched to listening for write events - AWAITING_REGISTER_WRITE, - // started writing response data, not fully complete yet - WRITING, - // another thread wants this framebuffer to go back to reading - AWAITING_REGISTER_READ, - // we want our transport and selection key invalidated in the selector - // thread - AWAITING_CLOSE - } - - /** - * Class that implements a sort of state machine around the interaction with a - * client and an invoker. It manages reading the frame size and frame data, - * getting it handed off as wrapped transports, and then the writing of - * response data back to the client. In the process it manages flipping the - * read and write bits on the selection key for its client. - */ - protected class FrameBuffer { - // the actual transport hooked up to the client. - public final TNonblockingTransport trans_; - - // the SelectionKey that corresponds to our transport - private final SelectionKey selectionKey_; - - // the SelectThread that owns the registration of our transport - private final AbstractSelectThread selectThread_; - - // where in the process of reading/writing are we? - private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE; - - // the ByteBuffer we'll be using to write and read, depending on the state - private ByteBuffer buffer_; - - private final TByteArrayOutputStream response_; - - // the frame that the TTransport should wrap. - private final TMemoryInputTransport frameTrans_; - - // the transport that should be used to connect to clients - private final TTransport inTrans_; - - private final TTransport outTrans_; - - // the input protocol to use on frames - private final TProtocol inProt_; - - // the output protocol to use on frames - private final TProtocol outProt_; - - // context associated with this connection - private final ServerContext context_; - - public FrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, - final AbstractSelectThread selectThread) { - trans_ = trans; - selectionKey_ = selectionKey; - selectThread_ = selectThread; - buffer_ = ByteBuffer.allocate(4); - - frameTrans_ = new TMemoryInputTransport(); - response_ = new TByteArrayOutputStream(); - inTrans_ = inputTransportFactory_.getTransport(frameTrans_); - outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_)); - inProt_ = inputProtocolFactory_.getProtocol(inTrans_); - outProt_ = outputProtocolFactory_.getProtocol(outTrans_); - - if (eventHandler_ != null) { - context_ = eventHandler_.createContext(inProt_, outProt_, selectionKey); - } else { - context_ = null; - } - } - - /** - * Give this FrameBuffer a chance to read. The selector loop should have - * received a read event for this FrameBuffer. - * - * @return true if the connection should live on, false if it should be - * closed - */ - public boolean read() { - if (state_ == FrameBufferState.READING_FRAME_SIZE) { - // try to read the frame size completely - if (!internalRead()) { - return false; - } - - // if the frame size has been read completely, then prepare to read the - // actual frame. - if (buffer_.remaining() == 0) { - // pull out the frame size as an integer. - int frameSize = buffer_.getInt(0); - if (frameSize <= 0) { - LOGGER.error("Read an invalid frame size of " + frameSize - + ". Are you using TFramedTransport on the client side?"); - return false; - } - - // if this frame will always be too large for this server, log the - // error and close the connection. - if (frameSize > MAX_READ_BUFFER_BYTES) { - LOGGER.error("Read a frame size of " + frameSize - + ", which is bigger than the maximum allowable buffer size for ALL connections."); - return false; - } - - // if this frame will push us over the memory limit, then return. - // with luck, more memory will free up the next time around. - if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) { - return true; - } - - // increment the amount of memory allocated to read buffers - readBufferBytesAllocated.addAndGet(frameSize + 4); - - // reallocate the readbuffer as a frame-sized buffer - buffer_ = ByteBuffer.allocate(frameSize + 4); - buffer_.putInt(frameSize); - - state_ = FrameBufferState.READING_FRAME; - } else { - // this skips the check of READING_FRAME state below, since we can't - // possibly go on to that state if there's data left to be read at - // this one. - return true; - } - } - - // it is possible to fall through from the READING_FRAME_SIZE section - // to READING_FRAME if there's already some frame data available once - // READING_FRAME_SIZE is complete. - - if (state_ == FrameBufferState.READING_FRAME) { - if (!internalRead()) { - return false; - } - - // since we're already in the select loop here for sure, we can just - // modify our selection key directly. - if (buffer_.remaining() == 0) { - // get rid of the read select interests - selectionKey_.interestOps(0); - state_ = FrameBufferState.READ_FRAME_COMPLETE; - } - - return true; - } - - // if we fall through to this point, then the state must be invalid. - LOGGER.error("Read was called but state is invalid (" + state_ + ")"); - return false; - } - - /** - * Give this FrameBuffer a chance to write its output to the final client. - */ - public boolean write() { - if (state_ == FrameBufferState.WRITING) { - try { - if (trans_.write(buffer_) < 0) { - return false; - } - } catch (IOException e) { - LOGGER.warn("Got an IOException during write!", e); - return false; - } - - // we're done writing. now we need to switch back to reading. - if (buffer_.remaining() == 0) { - prepareRead(); - } - return true; - } - - LOGGER.error("Write was called, but state is invalid (" + state_ + ")"); - return false; - } - - /** - * Give this FrameBuffer a chance to set its interest to write, once data - * has come in. - */ - public void changeSelectInterests() { - if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) { - // set the OP_WRITE interest - selectionKey_.interestOps(SelectionKey.OP_WRITE); - state_ = FrameBufferState.WRITING; - } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) { - prepareRead(); - } else if (state_ == FrameBufferState.AWAITING_CLOSE) { - close(); - selectionKey_.cancel(); - } else { - LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")"); - } - } - - /** - * Shut the connection down. - */ - public void close() { - // if we're being closed due to an error, we might have allocated a - // buffer that we need to subtract for our memory accounting. - if (state_ == FrameBufferState.READING_FRAME || state_ == FrameBufferState.READ_FRAME_COMPLETE) { - readBufferBytesAllocated.addAndGet(-buffer_.array().length); - } - trans_.close(); - if (eventHandler_ != null) { - eventHandler_.deleteContext(context_, inProt_, outProt_); - } - } - - /** - * Check if this FrameBuffer has a full frame read. - */ - public boolean isFrameFullyRead() { - return state_ == FrameBufferState.READ_FRAME_COMPLETE; - } - - /** - * After the processor has processed the invocation, whatever thread is - * managing invocations should call this method on this FrameBuffer so we - * know it's time to start trying to write again. Also, if it turns out that - * there actually isn't any data in the response buffer, we'll skip trying - * to write and instead go back to reading. - */ - public void responseReady() { - // the read buffer is definitely no longer in use, so we will decrement - // our read buffer count. we do this here as well as in close because - // we'd like to free this read memory up as quickly as possible for other - // clients. - readBufferBytesAllocated.addAndGet(-buffer_.array().length); - - if (response_.len() == 0) { - // go straight to reading again. this was probably an oneway method - state_ = FrameBufferState.AWAITING_REGISTER_READ; - buffer_ = null; - } else { - buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len()); - - // set state that we're waiting to be switched to write. we do this - // asynchronously through requestSelectInterestChange() because there is - // a possibility that we're not in the main thread, and thus currently - // blocked in select(). (this functionality is in place for the sake of - // the HsHa server.) - state_ = FrameBufferState.AWAITING_REGISTER_WRITE; - } - requestSelectInterestChange(); - } - - /** - * Actually invoke the method signified by this FrameBuffer. - */ - public void invoke() { - frameTrans_.reset(buffer_.array()); - response_.reset(); - - try { - if (eventHandler_ != null) { - eventHandler_.processContext(context_, inTrans_, outTrans_); - } - processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_); - responseReady(); - return; - } catch (TException te) { - LOGGER.warn("Exception while invoking!", te); - } catch (Throwable t) { - LOGGER.error("Unexpected throwable while invoking!", t); - } - // This will only be reached when there is a throwable. - state_ = FrameBufferState.AWAITING_CLOSE; - requestSelectInterestChange(); - } - - /** - * Perform a read into buffer. - * - * @return true if the read succeeded, false if there was an error or the - * connection closed. - */ - private boolean internalRead() { - try { - if (trans_.read(buffer_) < 0) { - return false; - } - return true; - } catch (IOException e) { - LOGGER.warn("Got an IOException in internalRead!", e); - return false; - } - } - - /** - * We're done writing, so reset our interest ops and change state - * accordingly. - */ - private void prepareRead() { - // we can set our interest directly without using the queue because - // we're in the select thread. - selectionKey_.interestOps(SelectionKey.OP_READ); - // get ready for another go-around - buffer_ = ByteBuffer.allocate(4); - state_ = FrameBufferState.READING_FRAME_SIZE; - } - - /** - * When this FrameBuffer needs to change its select interests and execution - * might not be in its select thread, then this method will make sure the - * interest change gets done when the select thread wakes back up. When the - * current thread is this FrameBuffer's select thread, then it just does the - * interest change immediately. - */ - private void requestSelectInterestChange() { - if (Thread.currentThread() == this.selectThread_) { - changeSelectInterests(); - } else { - this.selectThread_.requestSelectInterestChange(this); - } - } - } // FrameBuffer -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java ---------------------------------------------------------------------- diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java deleted file mode 100644 index fe71e11..0000000 --- a/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.apache.blur.thrift.server; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import org.apache.blur.thrift.server.AbstractNonblockingServer.FrameBuffer; - -/** - * An Invocation represents a method call that is prepared to execute, given an - * idle worker thread. It contains the input and output protocols the thread's - * processor should use to perform the usual Thrift invocation. - */ -class Invocation implements Runnable { - private final FrameBuffer frameBuffer; - - public Invocation(final FrameBuffer frameBuffer) { - this.frameBuffer = frameBuffer; - } - - public void run() { - frameBuffer.invoke(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java ---------------------------------------------------------------------- diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java deleted file mode 100644 index b58b063..0000000 --- a/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java +++ /dev/null @@ -1,659 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.blur.thrift.server; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.SelectorProvider; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; - -import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerTransport; -import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingTransport; -import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A Half-Sync/Half-Async server with a separate pool of threads to handle - * non-blocking I/O. Accepts are handled on a single thread, and a configurable - * number of nonblocking selector threads manage reading and writing of client - * connections. A synchronous worker thread pool handles processing of requests. - * - * Performs better than TNonblockingServer/THsHaServer in multi-core - * environments when the the bottleneck is CPU on the single selector thread - * handling I/O. In addition, because the accept handling is decoupled from - * reads/writes and invocation, the server has better ability to handle back- - * pressure from new connections (e.g. stop accepting when busy). - * - * Like TNonblockingServer, it relies on the use of TFramedTransport. - */ -public class TThreadedSelectorServer extends AbstractNonblockingServer { - private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName()); - - private static int selectorThreadCount; - - public static class Args extends AbstractNonblockingServerArgs<Args> { - - /** The number of threads for selecting on already-accepted connections */ - public int selectorThreads = 2; - /** - * The size of the executor service (if none is specified) that will handle - * invocations. This may be set to 0, in which case invocations will be - * handled directly on the selector threads (as is in TNonblockingServer) - */ - private int workerThreads = 5; - /** Time to wait for server to stop gracefully */ - private int stopTimeoutVal = 60; - private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; - /** The ExecutorService for handling dispatched requests */ - private ExecutorService executorService = null; - /** - * The size of the blocking queue per selector thread for passing accepted - * connections to the selector thread - */ - private int acceptQueueSizePerThread = 4; - - /** - * Determines the strategy for handling new accepted connections. - */ - public static enum AcceptPolicy { - /** - * Require accepted connection registration to be handled by the executor. - * If the worker pool is saturated, further accepts will be closed - * immediately. Slightly increases latency due to an extra scheduling. - */ - FAIR_ACCEPT, - /** - * Handle the accepts as fast as possible, disregarding the status of the - * executor service. - */ - FAST_ACCEPT - } - - private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT; - - public Args(TNonblockingServerTransport transport) { - super(transport); - } - - public Args selectorThreads(int i) { - selectorThreads = i; - return this; - } - - public int getSelectorThreads() { - return selectorThreads; - } - - public Args workerThreads(int i) { - workerThreads = i; - return this; - } - - public int getWorkerThreads() { - return workerThreads; - } - - public int getStopTimeoutVal() { - return stopTimeoutVal; - } - - public Args stopTimeoutVal(int stopTimeoutVal) { - this.stopTimeoutVal = stopTimeoutVal; - return this; - } - - public TimeUnit getStopTimeoutUnit() { - return stopTimeoutUnit; - } - - public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) { - this.stopTimeoutUnit = stopTimeoutUnit; - return this; - } - - public ExecutorService getExecutorService() { - return executorService; - } - - public Args executorService(ExecutorService executorService) { - this.executorService = executorService; - return this; - } - - public int getAcceptQueueSizePerThread() { - return acceptQueueSizePerThread; - } - - public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) { - this.acceptQueueSizePerThread = acceptQueueSizePerThread; - return this; - } - - public AcceptPolicy getAcceptPolicy() { - return acceptPolicy; - } - - public Args acceptPolicy(AcceptPolicy acceptPolicy) { - this.acceptPolicy = acceptPolicy; - return this; - } - - public void validate() { - if (selectorThreads <= 0) { - throw new IllegalArgumentException("selectorThreads must be positive."); - } - if (workerThreads < 0) { - throw new IllegalArgumentException("workerThreads must be non-negative."); - } - if (acceptQueueSizePerThread <= 0) { - throw new IllegalArgumentException("acceptQueueSizePerThread must be positive."); - } - } - } - - // Flag for stopping the server - // Please see THRIFT-1795 for the usage of this flag - private volatile boolean stopped_ = false; - - // The thread handling all accepts - private AcceptThread acceptThread; - - // Threads handling events on client transports - private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>(); - - // This wraps all the functionality of queueing and thread pool management - // for the passing of Invocations from the selector thread(s) to the workers - // (if any). - private final ExecutorService invoker; - - private final Args args; - - /** - * Create the server with the specified Args configuration - */ - public TThreadedSelectorServer(Args args) { - super(args); - args.validate(); - invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService; - this.args = args; - } - - /** - * Start the accept and selector threads running to deal with clients. - * - * @return true if everything went ok, false if we couldn't start for some - * reason. - */ - @Override - protected boolean startThreads() { - try { - for (int i = 0; i < args.selectorThreads; ++i) { - selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread)); - } - acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_, - createSelectorThreadLoadBalancer(selectorThreads)); - for (SelectorThread thread : selectorThreads) { - thread.start(); - } - acceptThread.start(); - return true; - } catch (IOException e) { - LOGGER.error("Failed to start threads!", e); - return false; - } - } - - /** - * Joins the accept and selector threads and shuts down the executor service. - */ - @Override - protected void waitForShutdown() { - try { - joinThreads(); - } catch (InterruptedException e) { - // Non-graceful shutdown occurred - LOGGER.error("Interrupted while joining threads!", e); - } - gracefullyShutdownInvokerPool(); - } - - protected void joinThreads() throws InterruptedException { - // wait until the io threads exit - acceptThread.join(); - for (SelectorThread thread : selectorThreads) { - thread.join(); - } - } - - /** - * Stop serving and shut everything down. - */ - @Override - public void stop() { - stopped_ = true; - - // Stop queuing connect attempts asap - stopListening(); - - if (acceptThread != null) { - acceptThread.wakeupSelector(); - } - if (selectorThreads != null) { - for (SelectorThread thread : selectorThreads) { - if (thread != null) - thread.wakeupSelector(); - } - } - } - - protected void gracefullyShutdownInvokerPool() { - // try to gracefully shut down the executor service - invoker.shutdown(); - - // Loop until awaitTermination finally does return without a interrupted - // exception. If we don't do this, then we'll shut down prematurely. We want - // to let the executorService clear it's task queue, closing client sockets - // appropriately. - long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal); - long now = System.currentTimeMillis(); - while (timeoutMS >= 0) { - try { - invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); - break; - } catch (InterruptedException ix) { - long newnow = System.currentTimeMillis(); - timeoutMS -= (newnow - now); - now = newnow; - } - } - } - - /** - * We override the standard invoke method here to queue the invocation for - * invoker service instead of immediately invoking. If there is no thread - * pool, handle the invocation inline on this thread - */ - @Override - protected boolean requestInvoke(FrameBuffer frameBuffer) { - Runnable invocation = getRunnable(frameBuffer); - if (invoker != null) { - try { - invoker.execute(invocation); - return true; - } catch (RejectedExecutionException rx) { - LOGGER.warn("ExecutorService rejected execution!", rx); - return false; - } - } else { - // Invoke on the caller's thread - invocation.run(); - return true; - } - } - - protected Runnable getRunnable(FrameBuffer frameBuffer) { - return new Invocation(frameBuffer); - } - - /** - * Helper to create the invoker if one is not specified - */ - protected static ExecutorService createDefaultExecutor(Args options) { - return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null; - } - - private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) { - if (queueSize == 0) { - // Unbounded queue - return new LinkedBlockingQueue<TNonblockingTransport>(); - } - return new ArrayBlockingQueue<TNonblockingTransport>(queueSize); - } - - /** - * The thread that selects on the server transport (listen socket) and accepts - * new connections to hand off to the IO selector threads - */ - protected class AcceptThread extends Thread { - - // The listen socket to accept on - private final TNonblockingServerTransport serverTransport; - private final Selector acceptSelector; - - private final SelectorThreadLoadBalancer threadChooser; - - /** - * Set up the AcceptThead - * - * @throws IOException - */ - public AcceptThread(TNonblockingServerTransport serverTransport, SelectorThreadLoadBalancer threadChooser) - throws IOException { - this.serverTransport = serverTransport; - this.threadChooser = threadChooser; - this.acceptSelector = SelectorProvider.provider().openSelector(); - this.serverTransport.registerSelector(acceptSelector); - } - - /** - * The work loop. Selects on the server transport and accepts. If there was - * a server transport that had blocking accepts, and returned on blocking - * client transports, that should be used instead - */ - public void run() { - try { - if (eventHandler_ != null) { - eventHandler_.preServe(); - } - - while (!stopped_) { - select(); - } - } catch (Throwable t) { - LOGGER.error("run() exiting due to uncaught error", t); - } finally { - // This will wake up the selector threads - TThreadedSelectorServer.this.stop(); - } - } - - /** - * If the selector is blocked, wake it up. - */ - public void wakeupSelector() { - acceptSelector.wakeup(); - } - - /** - * Select and process IO events appropriately: If there are connections to - * be accepted, accept them. - */ - private void select() { - try { - // wait for connect events. - acceptSelector.select(); - - // process the io events we received - Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator(); - while (!stopped_ && selectedKeys.hasNext()) { - SelectionKey key = selectedKeys.next(); - selectedKeys.remove(); - - // skip if not valid - if (!key.isValid()) { - continue; - } - - if (key.isAcceptable()) { - handleAccept(); - } else { - LOGGER.warn("Unexpected state in select! " + key.interestOps()); - } - } - } catch (IOException e) { - LOGGER.warn("Got an IOException while selecting!", e); - } - } - - /** - * Accept a new connection. - */ - private void handleAccept() { - final TNonblockingTransport client = doAccept(); - if (client != null) { - // Pass this connection to a selector thread - final SelectorThread targetThread = threadChooser.nextThread(); - - if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) { - doAddAccept(targetThread, client); - } else { - // FAIR_ACCEPT - try { - invoker.submit(new Runnable() { - public void run() { - doAddAccept(targetThread, client); - } - }); - } catch (RejectedExecutionException rx) { - LOGGER.warn("ExecutorService rejected accept registration!", rx); - // close immediately - client.close(); - } - } - } - } - - private TNonblockingTransport doAccept() { - try { - return (TNonblockingTransport) serverTransport.accept(); - } catch (TTransportException tte) { - // something went wrong accepting. - LOGGER.warn("Exception trying to accept!", tte); - return null; - } - } - - private void doAddAccept(SelectorThread thread, TNonblockingTransport client) { - if (!thread.addAcceptedConnection(client)) { - client.close(); - } - } - } // AcceptThread - - /** - * The SelectorThread(s) will be doing all the selecting on accepted active - * connections. - */ - protected class SelectorThread extends AbstractSelectThread { - - // Accepted connections added by the accept thread. - private final BlockingQueue<TNonblockingTransport> acceptedQueue; - - /** - * Set up the SelectorThread with an unbounded queue for incoming accepts. - * - * @throws IOException - * if a selector cannot be created - */ - public SelectorThread() throws IOException { - this(new LinkedBlockingQueue<TNonblockingTransport>()); - } - - /** - * Set up the SelectorThread with an bounded queue for incoming accepts. - * - * @throws IOException - * if a selector cannot be created - */ - public SelectorThread(int maxPendingAccepts) throws IOException { - this(createDefaultAcceptQueue(maxPendingAccepts)); - } - - /** - * Set up the SelectorThread with a specified queue for connections. - * - * @param acceptedQueue - * The BlockingQueue implementation for holding incoming accepted - * connections. - * @throws IOException - * if a selector cannot be created. - */ - public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException { - this.acceptedQueue = acceptedQueue; - } - - /** - * Hands off an accepted connection to be handled by this thread. This - * method will block if the queue for new connections is at capacity. - * - * @param accepted - * The connection that has been accepted. - * @return true if the connection has been successfully added. - */ - public boolean addAcceptedConnection(TNonblockingTransport accepted) { - try { - acceptedQueue.put(accepted); - } catch (InterruptedException e) { - LOGGER.warn("Interrupted while adding accepted connection!", e); - return false; - } - selector.wakeup(); - return true; - } - - /** - * The work loop. Handles selecting (read/write IO), dispatching, and - * managing the selection preferences of all existing connections. - */ - public void run() { - try { - Thread thread = Thread.currentThread(); - if (thread.getName().startsWith("Thread-")) { - thread.setName("TThreadedSelectorServer-SelectorThread-" + selectorThreadCount++); - } - while (!stopped_) { - select(); - processAcceptedConnections(); - processInterestChanges(); - } - for (SelectionKey selectionKey : selector.keys()) { - cleanupSelectionKey(selectionKey); - } - } catch (Throwable t) { - LOGGER.error("run() exiting due to uncaught error", t); - } finally { - // This will wake up the accept thread and the other selector threads - TThreadedSelectorServer.this.stop(); - } - } - - /** - * Select and process IO events appropriately: If there are existing - * connections with data waiting to be read, read it, buffering until a - * whole frame has been read. If there are any pending responses, buffer - * them until their target client is available, and then send the data. - */ - private void select() { - try { - // wait for io events. - selector.select(); - - // process the io events we received - Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); - while (!stopped_ && selectedKeys.hasNext()) { - SelectionKey key = selectedKeys.next(); - selectedKeys.remove(); - - // skip if not valid - if (!key.isValid()) { - cleanupSelectionKey(key); - continue; - } - - if (key.isReadable()) { - // deal with reads - handleRead(key); - } else if (key.isWritable()) { - // deal with writes - handleWrite(key); - } else { - LOGGER.warn("Unexpected state in select! " + key.interestOps()); - } - } - } catch (IOException e) { - LOGGER.warn("Got an IOException while selecting!", e); - } - } - - private void processAcceptedConnections() { - // Register accepted connections - while (!stopped_) { - TNonblockingTransport accepted = acceptedQueue.poll(); - if (accepted == null) { - break; - } - registerAccepted(accepted); - } - } - - private void registerAccepted(TNonblockingTransport accepted) { - SelectionKey clientKey = null; - try { - clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ); - - FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this); - clientKey.attach(frameBuffer); - } catch (IOException e) { - LOGGER.warn("Failed to register accepted connection to selector!", e); - if (clientKey != null) { - cleanupSelectionKey(clientKey); - } - accepted.close(); - } - } - } // SelectorThread - - /** - * Creates a SelectorThreadLoadBalancer to be used by the accept thread for - * assigning newly accepted connections across the threads. - */ - protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) { - return new SelectorThreadLoadBalancer(threads); - } - - /** - * A round robin load balancer for choosing selector threads for new - * connections. - */ - protected class SelectorThreadLoadBalancer { - private final Collection<? extends SelectorThread> threads; - private Iterator<? extends SelectorThread> nextThreadIterator; - - public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) { - if (threads.isEmpty()) { - throw new IllegalArgumentException("At least one selector thread is required"); - } - this.threads = Collections.unmodifiableList(new ArrayList<T>(threads)); - nextThreadIterator = this.threads.iterator(); - } - - public SelectorThread nextThread() { - // Choose a selector thread (round robin) - if (!nextThreadIterator.hasNext()) { - nextThreadIterator = threads.iterator(); - } - return nextThreadIterator.next(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTrace.java ---------------------------------------------------------------------- diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTrace.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTrace.java deleted file mode 100644 index 3a42983..0000000 --- a/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTrace.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.blur.thrift.server; - -public interface ThriftTrace { - - ThriftTracer getTracer(String name); - -} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTracer.java ---------------------------------------------------------------------- diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTracer.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTracer.java deleted file mode 100644 index c8a9937..0000000 --- a/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTracer.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.blur.thrift.server; - -public abstract class ThriftTracer { - - public static final ThriftTracer NOTHING = new ThriftTracer() { - @Override - public void start() { - - } - - @Override - public void end() { - - } - }; - - public abstract void start(); - - public abstract void end(); - -}