Author: ecn
Date: Thu Jan 31 17:29:07 2013
New Revision: 1441080
URL: http://svn.apache.org/viewvc?rev=1441080&view=rev
Log:
ACCUMULO-966 made client address available again
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java?rev=1441080&r1=1441079&r2=1441080&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
Thu Jan 31 17:29:07 2013
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.util.Random;
@@ -41,10 +42,10 @@ import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -181,6 +182,34 @@ public class TServerUtils {
}
}
+ public static class THsHaServer extends org.apache.thrift.server.THsHaServer
{
+ public THsHaServer(Args args) {
+ super(args);
+ }
+
+ protected Runnable getRunnable(FrameBuffer frameBuffer) {
+ return new Invocation(frameBuffer);
+ }
+
+ private class Invocation implements Runnable {
+
+ private final FrameBuffer frameBuffer;
+
+ public Invocation(final FrameBuffer frameBuffer) {
+ this.frameBuffer = frameBuffer;
+ }
+
+ public void run() {
+ if (frameBuffer.trans_ instanceof TNonblockingSocket) {
+ TNonblockingSocket tsock = (TNonblockingSocket) frameBuffer.trans_;
+ Socket sock = tsock.getSocketChannel().socket();
+ clientAddress.set(sock.getInetAddress().getHostAddress() + ":" +
sock.getPort());
+ }
+ frameBuffer.invoke();
+ }
+ }
+ }
+
public static ServerPort startHsHaServer(int port, TProcessor processor,
final String serverName, String threadName, final int numThreads,
long timeBetweenThreadChecks, long maxMessageSize) throws
TTransportException {
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
@@ -217,7 +246,7 @@ public class TServerUtils {
}, timeBetweenThreadChecks, timeBetweenThreadChecks);
options.executorService(pool);
processor = new TServerUtils.TimedProcessor(processor, serverName,
threadName);
- options.processorFactory(new ClientInfoProcessorFactory(processor));
+ options.processorFactory(new TProcessorFactory(processor));
return new ServerPort(new THsHaServer(options), port);
}