This is an automated email from the ASF dual-hosted git repository.

jking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git


The following commit(s) were added to refs/heads/master by this push:
     new 393f6c9  THRIFT-3877: fix py/py3 server, java client with http 
transport
393f6c9 is described below

commit 393f6c93e1a65b7be74d79b5a6b00f878e88a630
Author: James E. King III <[email protected]>
AuthorDate: Sat Feb 9 10:35:44 2019 -0500

    THRIFT-3877: fix py/py3 server, java client with http transport
    
    The java TestClient asks the server to runa  oneway request that
    sleeps for 3 seconds.  If the java TestClient sees the duration
    of the call exceed one second, it fails the test.  This means the
    server did not participate in the "fire and forget" dynamics of
    ONEWAY requests.  In this case the THttpServer was processing the
    RPC before sending the transport response.  The fix was to enhance
    the TProcessor so that the THttpServer has an opportunity to inspect
    the message header before processing the RPC.
    
    This is partly due to the violation of the THttpServer in the
    layered architecture.  It is essentially implementing a combined
    server and transport, whereas there should be a distinct server,
    protocol, and transport separation.  Many languages seem to have
    this problem where HTTP was introduced.
---
 compiler/cpp/src/thrift/generate/t_py_generator.cc | 11 +++++++
 .../test/org/apache/thrift/test/TestClient.java    | 11 +++++--
 lib/py/src/TMultiplexedProcessor.py                |  4 +++
 lib/py/src/Thrift.py                               | 12 +++++++
 lib/py/src/server/THttpServer.py                   | 37 ++++++++++++++++++----
 test/known_failures_Linux.json                     | 32 +------------------
 6 files changed, 67 insertions(+), 40 deletions(-)

diff --git a/compiler/cpp/src/thrift/generate/t_py_generator.cc 
b/compiler/cpp/src/thrift/generate/t_py_generator.cc
index c16d6d3..83462f4 100644
--- a/compiler/cpp/src/thrift/generate/t_py_generator.cc
+++ b/compiler/cpp/src/thrift/generate/t_py_generator.cc
@@ -1831,6 +1831,13 @@ void t_py_generator::generate_service_server(t_service* 
tservice) {
     f_service_ << indent() << "self._processMap[\"" << (*f_iter)->get_name()
                << "\"] = Processor.process_" << (*f_iter)->get_name() << endl;
   }
+  f_service_ << indent() << "self._on_message_begin = None" << endl;
+  indent_down();
+  f_service_ << endl;
+
+  f_service_ << indent() << "def on_message_begin(self, func):" << endl;
+  indent_up();
+    f_service_ << indent() << "self._on_message_begin = func" << endl;
   indent_down();
   f_service_ << endl;
 
@@ -1839,6 +1846,10 @@ void t_py_generator::generate_service_server(t_service* 
tservice) {
   indent_up();
 
   f_service_ << indent() << "(name, type, seqid) = iprot.readMessageBegin()" 
<< endl;
+  f_service_ << indent() << "if self._on_message_begin:" << endl;
+  indent_up();
+    f_service_ << indent() << "self._on_message_begin(name, type, seqid)" << 
endl;
+  indent_down();
 
   // TODO(mcslee): validate message
 
diff --git a/lib/java/test/org/apache/thrift/test/TestClient.java 
b/lib/java/test/org/apache/thrift/test/TestClient.java
index feaa972..84410ce 100644
--- a/lib/java/test/org/apache/thrift/test/TestClient.java
+++ b/lib/java/test/org/apache/thrift/test/TestClient.java
@@ -752,13 +752,18 @@ public class TestClient {
         testClient.testOneway(3);
         long onewayElapsedMillis = (System.nanoTime() - startOneway) / 1000000;
         if (onewayElapsedMillis > 200) {
-          System.out.println("Oneway test failed: took " +
+          System.out.println("Oneway test took too long to execute failed: 
took " +
                              Long.toString(onewayElapsedMillis) +
                              "ms");
-          System.out.printf("*** FAILURE ***\n");
+          System.out.println("oneway calls are 'fire and forget' and therefore 
should not cause blocking.");
+          System.out.println("Some transports (HTTP) have a required response, 
and typically this failure");
+          System.out.println("means the transport response was delayed until 
after the execution");
+          System.out.println("of the RPC.  The server should post the 
transport response immediately and");
+          System.out.println("before executing the RPC.");
+          System.out.println("*** FAILURE ***");
           returnCode |= ERR_BASETYPES;
         } else {
-          System.out.println("Success - took " +
+          System.out.println("Success - fire and forget only took " +
                              Long.toString(onewayElapsedMillis) +
                              "ms");
         }
diff --git a/lib/py/src/TMultiplexedProcessor.py 
b/lib/py/src/TMultiplexedProcessor.py
index bd10d9b..ff88430 100644
--- a/lib/py/src/TMultiplexedProcessor.py
+++ b/lib/py/src/TMultiplexedProcessor.py
@@ -39,6 +39,10 @@ class TMultiplexedProcessor(TProcessor):
     def registerProcessor(self, serviceName, processor):
         self.services[serviceName] = processor
 
+    def on_message_begin(self, func):
+        for key in self.services.keys():
+            self.services[key].on_message_begin(func)
+
     def process(self, iprot, oprot):
         (name, type, seqid) = iprot.readMessageBegin()
         if type != TMessageType.CALL and type != TMessageType.ONEWAY:
diff --git a/lib/py/src/Thrift.py b/lib/py/src/Thrift.py
index 00941d8..c390cbb 100644
--- a/lib/py/src/Thrift.py
+++ b/lib/py/src/Thrift.py
@@ -72,6 +72,18 @@ class TProcessor(object):
     """Base class for processor, which works on two streams."""
 
     def process(self, iprot, oprot):
+        """
+        Process a request.  The normal behvaior is to have the
+        processor invoke the correct handler and then it is the
+        server's responsibility to write the response to oprot.
+        """
+        pass
+
+    def on_message_begin(self, func):
+        """
+        Install a callback that receives (name, type, seqid)
+        after the message header is read.
+        """
         pass
 
 
diff --git a/lib/py/src/server/THttpServer.py b/lib/py/src/server/THttpServer.py
index 85cf400..47e817d 100644
--- a/lib/py/src/server/THttpServer.py
+++ b/lib/py/src/server/THttpServer.py
@@ -21,6 +21,7 @@ import ssl
 
 from six.moves import BaseHTTPServer
 
+from thrift.Thrift import TMessageType
 from thrift.server import TServer
 from thrift.transport import TTransport
 
@@ -32,7 +33,9 @@ class ResponseException(Exception):
     to override this behavior (e.g., to simulate a misconfigured or
     overloaded web server during testing), it can raise a ResponseException.
     The function passed to the constructor will be called with the
-    RequestHandler as its only argument.
+    RequestHandler as its only argument.  Note that this is irrelevant
+    for ONEWAY requests, as the HTTP response must be sent before the
+    RPC is processed.
     """
     def __init__(self, handler):
         self.handler = handler
@@ -43,6 +46,9 @@ class THttpServer(TServer.TServer):
 
     This class is not very performant, but it is useful (for example) for
     acting as a mock version of an Apache-based PHP Thrift endpoint.
+    Also important to note the HTTP implementation pretty much violates the
+    transport/protocol/processor/server layering, by performing the transport
+    functions here.  This means things like oneway handling are oddly exposed.
     """
     def __init__(self,
                  processor,
@@ -68,26 +74,45 @@ class THttpServer(TServer.TServer):
                                  inputProtocolFactory, outputProtocolFactory)
 
         thttpserver = self
+        self._replied = None
 
         class RequestHander(BaseHTTPServer.BaseHTTPRequestHandler):
             def do_POST(self):
                 # Don't care about the request path.
-                itrans = TTransport.TFileObjectTransport(self.rfile)
-                otrans = TTransport.TFileObjectTransport(self.wfile)
+                thttpserver._replied = False
+                iftrans = TTransport.TFileObjectTransport(self.rfile)
                 itrans = TTransport.TBufferedTransport(
-                    itrans, int(self.headers['Content-Length']))
+                    iftrans, int(self.headers['Content-Length']))
                 otrans = TTransport.TMemoryBuffer()
                 iprot = thttpserver.inputProtocolFactory.getProtocol(itrans)
                 oprot = thttpserver.outputProtocolFactory.getProtocol(otrans)
                 try:
+                    thttpserver.processor.on_message_begin(self.on_begin)
                     thttpserver.processor.process(iprot, oprot)
                 except ResponseException as exn:
                     exn.handler(self)
                 else:
+                    if not thttpserver._replied:
+                        # If the request was ONEWAY we would have replied 
already
+                        data = otrans.getvalue()
+                        self.send_response(200)
+                        self.send_header("Content-Length", len(data))
+                        self.send_header("Content-Type", 
"application/x-thrift")
+                        self.end_headers()
+                        self.wfile.write(data)
+
+            def on_begin(self, name, type, seqid):
+                """
+                Inspect the message header.
+
+                This allows us to post an immediate transport response
+                if the request is a ONEWAY message type.
+                """
+                if type == TMessageType.ONEWAY:
                     self.send_response(200)
-                    self.send_header("content-type", "application/x-thrift")
+                    self.send_header("Content-Type", "application/x-thrift")
                     self.end_headers()
-                    self.wfile.write(otrans.getvalue())
+                    thttpserver._replied = True
 
         self.httpd = server_class(server_address, RequestHander)
 
diff --git a/test/known_failures_Linux.json b/test/known_failures_Linux.json
index dd7fb6b..5beaa58 100644
--- a/test/known_failures_Linux.json
+++ b/test/known_failures_Linux.json
@@ -452,35 +452,20 @@
   "py-hs_compact_http-ip",
   "py-hs_header_http-ip",
   "py-hs_json_http-ip",
-  "py-java_accel-binary_http-ip",
   "py-java_accel-binary_http-ip-ssl",
-  "py-java_accelc-compact_http-ip",
   "py-java_accelc-compact_http-ip-ssl",
-  "py-java_binary_http-ip",
   "py-java_binary_http-ip-ssl",
-  "py-java_compact_http-ip",
   "py-java_compact_http-ip-ssl",
-  "py-java_json_http-ip",
   "py-java_json_http-ip-ssl",
-  "py-java_multi-binary_http-ip",
   "py-java_multi-binary_http-ip-ssl",
-  "py-java_multi_http-ip",
   "py-java_multi_http-ip-ssl",
-  "py-java_multia-binary_http-ip",
   "py-java_multia-binary_http-ip-ssl",
-  "py-java_multia-multi_http-ip",
   "py-java_multia-multi_http-ip-ssl",
-  "py-java_multiac-compact_http-ip",
   "py-java_multiac-compact_http-ip-ssl",
-  "py-java_multiac-multic_http-ip",
   "py-java_multiac-multic_http-ip-ssl",
-  "py-java_multic-compact_http-ip",
   "py-java_multic-compact_http-ip-ssl",
-  "py-java_multic_http-ip",
   "py-java_multic_http-ip-ssl",
-  "py-java_multij-json_http-ip",
   "py-java_multij-json_http-ip-ssl",
-  "py-java_multij_http-ip",
   "py-java_multij_http-ip-ssl",
   "py-lua_accel-binary_http-ip",
   "py-lua_accelc-compact_http-ip",
@@ -564,35 +549,20 @@
   "py3-hs_compact_http-ip",
   "py3-hs_header_http-ip",
   "py3-hs_json_http-ip",
-  "py3-java_accel-binary_http-ip",
   "py3-java_accel-binary_http-ip-ssl",
-  "py3-java_accelc-compact_http-ip",
   "py3-java_accelc-compact_http-ip-ssl",
-  "py3-java_binary_http-ip",
   "py3-java_binary_http-ip-ssl",
-  "py3-java_compact_http-ip",
   "py3-java_compact_http-ip-ssl",
-  "py3-java_json_http-ip",
   "py3-java_json_http-ip-ssl",
-  "py3-java_multi-binary_http-ip",
   "py3-java_multi-binary_http-ip-ssl",
-  "py3-java_multi_http-ip",
   "py3-java_multi_http-ip-ssl",
-  "py3-java_multia-binary_http-ip",
   "py3-java_multia-binary_http-ip-ssl",
-  "py3-java_multia-multi_http-ip",
   "py3-java_multia-multi_http-ip-ssl",
-  "py3-java_multiac-compact_http-ip",
   "py3-java_multiac-compact_http-ip-ssl",
-  "py3-java_multiac-multic_http-ip",
   "py3-java_multiac-multic_http-ip-ssl",
-  "py3-java_multic-compact_http-ip",
   "py3-java_multic-compact_http-ip-ssl",
-  "py3-java_multic_http-ip",
   "py3-java_multic_http-ip-ssl",
-  "py3-java_multij-json_http-ip",
   "py3-java_multij-json_http-ip-ssl",
-  "py3-java_multij_http-ip",
   "py3-java_multij_http-ip-ssl",
   "py3-lua_accel-binary_http-ip",
   "py3-lua_accelc-compact_http-ip",
@@ -613,4 +583,4 @@
   "rb-cpp_json_framed-domain",
   "rb-cpp_json_framed-ip",
   "rb-cpp_json_framed-ip-ssl"
-]
\ No newline at end of file
+]

Reply via email to