Author: roger
Date: Tue Jun  5 20:14:14 2012
New Revision: 1346566

URL: http://svn.apache.org/viewvc?rev=1346566&view=rev
Log:
THRIFT-1195 Allow users to act on client connects/disconnects
HIVE-3067 Shutdown HiveMetaStore on client disconnect or timeout
HIVE-3057 metastore.HiveMetaStore$HMSHandler should set the thread local raw 
store to null in shutdown()

Patch: Dragan Okiljevic

Added:
    thrift/trunk/lib/java/src/org/apache/thrift/server/ServerContext.java
    thrift/trunk/lib/java/src/org/apache/thrift/server/TServerEventHandler.java
Modified:
    
thrift/trunk/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
    thrift/trunk/lib/java/src/org/apache/thrift/server/TServer.java
    thrift/trunk/lib/java/src/org/apache/thrift/server/TSimpleServer.java
    thrift/trunk/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
    thrift/trunk/lib/java/test/org/apache/thrift/test/TestServer.java

Modified: 
thrift/trunk/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
URL: 
http://svn.apache.org/viewvc/thrift/trunk/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java?rev=1346566&r1=1346565&r2=1346566&view=diff
==============================================================================
--- 
thrift/trunk/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
 (original)
+++ 
thrift/trunk/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
 Tue Jun  5 20:14:14 2012
@@ -550,4 +550,13 @@ public abstract class AbstractNonblockin
       }
     }
   } // FrameBuffer
+
+  public void setServerEventHandler(TServerEventHandler eventHandler) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  public TServerEventHandler getEventHandler() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
 }

Added: thrift/trunk/lib/java/src/org/apache/thrift/server/ServerContext.java
URL: 
http://svn.apache.org/viewvc/thrift/trunk/lib/java/src/org/apache/thrift/server/ServerContext.java?rev=1346566&view=auto
==============================================================================
--- thrift/trunk/lib/java/src/org/apache/thrift/server/ServerContext.java 
(added)
+++ thrift/trunk/lib/java/src/org/apache/thrift/server/ServerContext.java Tue 
Jun  5 20:14:14 2012
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Interface for storing server's connection context
+ */
+ 
+package org.apache.thrift.server;
+
+public interface ServerContext {}

Modified: thrift/trunk/lib/java/src/org/apache/thrift/server/TServer.java
URL: 
http://svn.apache.org/viewvc/thrift/trunk/lib/java/src/org/apache/thrift/server/TServer.java?rev=1346566&r1=1346565&r2=1346566&view=diff
==============================================================================
--- thrift/trunk/lib/java/src/org/apache/thrift/server/TServer.java (original)
+++ thrift/trunk/lib/java/src/org/apache/thrift/server/TServer.java Tue Jun  5 
20:14:14 2012
@@ -125,6 +125,8 @@ public abstract class TServer {
 
   private boolean isServing;
 
+  protected TServerEventHandler eventHandler_;
+
   protected TServer(AbstractServerArgs args) {
     processorFactory_ = args.processorFactory;
     serverTransport_ = args.serverTransport;
@@ -152,4 +154,12 @@ public abstract class TServer {
   protected void setServing(boolean serving) {
     isServing = serving;
   }
+
+  public void setServerEventHandler(TServerEventHandler eventHandler) {
+    eventHandler_ = eventHandler;
+  }
+
+  public TServerEventHandler getEventHandler() {
+    return eventHandler_;
+  }
 }

Added: 
thrift/trunk/lib/java/src/org/apache/thrift/server/TServerEventHandler.java
URL: 
http://svn.apache.org/viewvc/thrift/trunk/lib/java/src/org/apache/thrift/server/TServerEventHandler.java?rev=1346566&view=auto
==============================================================================
--- thrift/trunk/lib/java/src/org/apache/thrift/server/TServerEventHandler.java 
(added)
+++ thrift/trunk/lib/java/src/org/apache/thrift/server/TServerEventHandler.java 
Tue Jun  5 20:14:14 2012
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.server;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * Interface that can handle events from the server core. To
+ * use this you should subclass it and implement the methods that you care
+ * about. Your subclass can also store local data that you may care about,
+ * such as additional "arguments" to these methods (stored in the object
+ * instance's state).
+ */
+public interface TServerEventHandler {
+
+  /**
+   * Called before the server begins.
+   */
+  void preServe();
+
+  /**
+   * Called when a new client has connected and is about to being processing.
+   */
+  ServerContext createContext(TProtocol input,
+                              TProtocol output);
+
+  /**
+   * Called when a client has finished request-handling to delete server
+   * context.
+   */
+  void deleteContext(ServerContext serverContext,
+                             TProtocol input,
+                             TProtocol output);
+
+  /**
+   * Called when a client is about to call the processor.
+   */
+  void processContext(ServerContext serverContext,
+                              TTransport inputTransport, TTransport 
outputTransport);
+
+}
\ No newline at end of file

Modified: thrift/trunk/lib/java/src/org/apache/thrift/server/TSimpleServer.java
URL: 
http://svn.apache.org/viewvc/thrift/trunk/lib/java/src/org/apache/thrift/server/TSimpleServer.java?rev=1346566&r1=1346565&r2=1346566&view=diff
==============================================================================
--- thrift/trunk/lib/java/src/org/apache/thrift/server/TSimpleServer.java 
(original)
+++ thrift/trunk/lib/java/src/org/apache/thrift/server/TSimpleServer.java Tue 
Jun  5 20:14:14 2012
@@ -50,6 +50,11 @@ public class TSimpleServer extends TServ
       return;
     }
 
+    // Run the preServe event
+    if (eventHandler_ != null) {
+      eventHandler_.preServe();
+    }
+
     setServing(true);
 
     while (!stopped_) {
@@ -59,6 +64,7 @@ public class TSimpleServer extends TServ
       TTransport outputTransport = null;
       TProtocol inputProtocol = null;
       TProtocol outputProtocol = null;
+      ServerContext connectionContext = null;
       try {
         client = serverTransport_.accept();
         if (client != null) {
@@ -67,7 +73,17 @@ public class TSimpleServer extends TServ
           outputTransport = outputTransportFactory_.getTransport(client);
           inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
           outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
-          while (processor.process(inputProtocol, outputProtocol)) {}
+          if (eventHandler_ != null) {
+            connectionContext = eventHandler_.createContext(inputProtocol, 
outputProtocol);
+          }
+          while (true) {
+            if (eventHandler_ != null) {
+              eventHandler_.processContext(connectionContext, inputTransport, 
outputTransport);
+            }
+            if(!processor.process(inputProtocol, outputProtocol)) {
+              break;
+            }
+          }
         }
       } catch (TTransportException ttx) {
         // Client died, just move on
@@ -81,6 +97,10 @@ public class TSimpleServer extends TServ
         }
       }
 
+      if (eventHandler_ != null) {
+        eventHandler_.deleteContext(connectionContext, inputProtocol, 
outputProtocol);
+      }
+
       if (inputTransport != null) {
         inputTransport.close();
       }

Modified: 
thrift/trunk/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
URL: 
http://svn.apache.org/viewvc/thrift/trunk/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java?rev=1346566&r1=1346565&r2=1346566&view=diff
==============================================================================
--- thrift/trunk/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java 
(original)
+++ thrift/trunk/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java 
Tue Jun  5 20:14:14 2012
@@ -108,6 +108,11 @@ public class TThreadPoolServer extends T
       return;
     }
 
+    // Run the preServe event
+    if (eventHandler_ != null) {
+      eventHandler_.preServe();
+    }
+
     stopped_ = false;
     setServing(true);
     while (!stopped_) {
@@ -175,15 +180,33 @@ public class TThreadPoolServer extends T
       TTransport outputTransport = null;
       TProtocol inputProtocol = null;
       TProtocol outputProtocol = null;
+
+      TServerEventHandler eventHandler = null;
+      ServerContext connectionContext = null;
+
       try {
         processor = processorFactory_.getProcessor(client_);
         inputTransport = inputTransportFactory_.getTransport(client_);
         outputTransport = outputTransportFactory_.getTransport(client_);
         inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
-        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);  
  
+
+        eventHandler = getEventHandler();
+        if (eventHandler != null) {
+          connectionContext = eventHandler.createContext(inputProtocol, 
outputProtocol);
+        }
         // we check stopped_ first to make sure we're not supposed to be 
shutting
         // down. this is necessary for graceful shutdown.
-        while (!stopped_ && processor.process(inputProtocol, outputProtocol)) 
{}
+        while (true) {
+
+            if (eventHandler != null) {
+              eventHandler.processContext(connectionContext, inputTransport, 
outputTransport);
+            }
+
+            if(stopped_ || !processor.process(inputProtocol, outputProtocol)) {
+              break;
+            }
+        }
       } catch (TTransportException ttx) {
         // Assume the client died and continue silently
       } catch (TException tx) {
@@ -192,6 +215,10 @@ public class TThreadPoolServer extends T
         LOGGER.error("Error occurred during processing of message.", x);
       }
 
+      if (eventHandler != null) {
+        eventHandler.deleteContext(connectionContext, inputProtocol, 
outputProtocol);
+      }
+
       if (inputTransport != null) {
         inputTransport.close();
       }

Modified: thrift/trunk/lib/java/test/org/apache/thrift/test/TestServer.java
URL: 
http://svn.apache.org/viewvc/thrift/trunk/lib/java/test/org/apache/thrift/test/TestServer.java?rev=1346566&r1=1346565&r2=1346566&view=diff
==============================================================================
--- thrift/trunk/lib/java/test/org/apache/thrift/test/TestServer.java (original)
+++ thrift/trunk/lib/java/test/org/apache/thrift/test/TestServer.java Tue Jun  
5 20:14:14 2012
@@ -26,13 +26,17 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.ServerContext;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TServer.Args;
 import org.apache.thrift.server.TSimpleServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.server.ServerTestBase.TestHandler;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
 
 import thrift.test.Insanity;
 import thrift.test.Numberz;
@@ -44,6 +48,51 @@ import thrift.test.Xtruct2;
 
 public class TestServer {
 
+  static class TestServerContext implements ServerContext {
+
+        int connectionId;
+
+        public TestServerContext(int connectionId) {
+            this.connectionId = connectionId;
+        }
+
+        public int getConnectionId() {
+            return connectionId;
+        }
+
+        public void setConnectionId(int connectionId) {
+            this.connectionId = connectionId;
+        }
+
+  }
+
+  static class TestServerEventHandler implements TServerEventHandler {
+
+        private int nextConnectionId = 1;
+
+        public void preServe() {
+            System.out.println("TServerEventHandler.preServe - called only 
once before server starts accepting connections");
+        }
+
+        public ServerContext createContext(TProtocol input, TProtocol output) {
+            //we can create some connection level data which is stored while 
connection is alive & served
+            TestServerContext ctx = new TestServerContext(nextConnectionId++);
+            System.out.println("TServerEventHandler.createContext - connection 
#"+ctx.getConnectionId()+" established");
+            return ctx;
+        }
+
+        public void deleteContext(ServerContext serverContext, TProtocol 
input, TProtocol output) {
+            TestServerContext ctx = (TestServerContext)serverContext;
+            System.out.println("TServerEventHandler.deleteContext - connection 
#"+ctx.getConnectionId()+" terminated");
+        }
+
+        public void processContext(ServerContext serverContext, TTransport 
inputTransport, TTransport outputTransport) {
+            TestServerContext ctx = (TestServerContext)serverContext;
+            System.out.println("TServerEventHandler.processContext - 
connection #"+ctx.getConnectionId()+" is ready to process next request");
+        }
+
+  }
+
   public static void main(String [] args) {
     try {
       int port = 9090;
@@ -74,6 +123,9 @@ public class TestServer {
       // ThreadPool Server
       serverEngine = new TThreadPoolServer(new 
TThreadPoolServer.Args(tServerSocket).processor(testProcessor).protocolFactory(tProtocolFactory));
 
+      //Set server event handler
+      serverEngine.setServerEventHandler(new TestServerEventHandler());
+
       // Run it
       System.out.println("Starting the server on port " + port + "...");
       serverEngine.serve();


Reply via email to