This is an automated email from the ASF dual-hosted git repository.
jking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git
The following commit(s) were added to refs/heads/master by this push:
new 393f6c9 THRIFT-3877: fix py/py3 server, java client with http
transport
393f6c9 is described below
commit 393f6c93e1a65b7be74d79b5a6b00f878e88a630
Author: James E. King III <[email protected]>
AuthorDate: Sat Feb 9 10:35:44 2019 -0500
THRIFT-3877: fix py/py3 server, java client with http transport
The java TestClient asks the server to runa oneway request that
sleeps for 3 seconds. If the java TestClient sees the duration
of the call exceed one second, it fails the test. This means the
server did not participate in the "fire and forget" dynamics of
ONEWAY requests. In this case the THttpServer was processing the
RPC before sending the transport response. The fix was to enhance
the TProcessor so that the THttpServer has an opportunity to inspect
the message header before processing the RPC.
This is partly due to the violation of the THttpServer in the
layered architecture. It is essentially implementing a combined
server and transport, whereas there should be a distinct server,
protocol, and transport separation. Many languages seem to have
this problem where HTTP was introduced.
---
compiler/cpp/src/thrift/generate/t_py_generator.cc | 11 +++++++
.../test/org/apache/thrift/test/TestClient.java | 11 +++++--
lib/py/src/TMultiplexedProcessor.py | 4 +++
lib/py/src/Thrift.py | 12 +++++++
lib/py/src/server/THttpServer.py | 37 ++++++++++++++++++----
test/known_failures_Linux.json | 32 +------------------
6 files changed, 67 insertions(+), 40 deletions(-)
diff --git a/compiler/cpp/src/thrift/generate/t_py_generator.cc
b/compiler/cpp/src/thrift/generate/t_py_generator.cc
index c16d6d3..83462f4 100644
--- a/compiler/cpp/src/thrift/generate/t_py_generator.cc
+++ b/compiler/cpp/src/thrift/generate/t_py_generator.cc
@@ -1831,6 +1831,13 @@ void t_py_generator::generate_service_server(t_service*
tservice) {
f_service_ << indent() << "self._processMap[\"" << (*f_iter)->get_name()
<< "\"] = Processor.process_" << (*f_iter)->get_name() << endl;
}
+ f_service_ << indent() << "self._on_message_begin = None" << endl;
+ indent_down();
+ f_service_ << endl;
+
+ f_service_ << indent() << "def on_message_begin(self, func):" << endl;
+ indent_up();
+ f_service_ << indent() << "self._on_message_begin = func" << endl;
indent_down();
f_service_ << endl;
@@ -1839,6 +1846,10 @@ void t_py_generator::generate_service_server(t_service*
tservice) {
indent_up();
f_service_ << indent() << "(name, type, seqid) = iprot.readMessageBegin()"
<< endl;
+ f_service_ << indent() << "if self._on_message_begin:" << endl;
+ indent_up();
+ f_service_ << indent() << "self._on_message_begin(name, type, seqid)" <<
endl;
+ indent_down();
// TODO(mcslee): validate message
diff --git a/lib/java/test/org/apache/thrift/test/TestClient.java
b/lib/java/test/org/apache/thrift/test/TestClient.java
index feaa972..84410ce 100644
--- a/lib/java/test/org/apache/thrift/test/TestClient.java
+++ b/lib/java/test/org/apache/thrift/test/TestClient.java
@@ -752,13 +752,18 @@ public class TestClient {
testClient.testOneway(3);
long onewayElapsedMillis = (System.nanoTime() - startOneway) / 1000000;
if (onewayElapsedMillis > 200) {
- System.out.println("Oneway test failed: took " +
+ System.out.println("Oneway test took too long to execute failed:
took " +
Long.toString(onewayElapsedMillis) +
"ms");
- System.out.printf("*** FAILURE ***\n");
+ System.out.println("oneway calls are 'fire and forget' and therefore
should not cause blocking.");
+ System.out.println("Some transports (HTTP) have a required response,
and typically this failure");
+ System.out.println("means the transport response was delayed until
after the execution");
+ System.out.println("of the RPC. The server should post the
transport response immediately and");
+ System.out.println("before executing the RPC.");
+ System.out.println("*** FAILURE ***");
returnCode |= ERR_BASETYPES;
} else {
- System.out.println("Success - took " +
+ System.out.println("Success - fire and forget only took " +
Long.toString(onewayElapsedMillis) +
"ms");
}
diff --git a/lib/py/src/TMultiplexedProcessor.py
b/lib/py/src/TMultiplexedProcessor.py
index bd10d9b..ff88430 100644
--- a/lib/py/src/TMultiplexedProcessor.py
+++ b/lib/py/src/TMultiplexedProcessor.py
@@ -39,6 +39,10 @@ class TMultiplexedProcessor(TProcessor):
def registerProcessor(self, serviceName, processor):
self.services[serviceName] = processor
+ def on_message_begin(self, func):
+ for key in self.services.keys():
+ self.services[key].on_message_begin(func)
+
def process(self, iprot, oprot):
(name, type, seqid) = iprot.readMessageBegin()
if type != TMessageType.CALL and type != TMessageType.ONEWAY:
diff --git a/lib/py/src/Thrift.py b/lib/py/src/Thrift.py
index 00941d8..c390cbb 100644
--- a/lib/py/src/Thrift.py
+++ b/lib/py/src/Thrift.py
@@ -72,6 +72,18 @@ class TProcessor(object):
"""Base class for processor, which works on two streams."""
def process(self, iprot, oprot):
+ """
+ Process a request. The normal behvaior is to have the
+ processor invoke the correct handler and then it is the
+ server's responsibility to write the response to oprot.
+ """
+ pass
+
+ def on_message_begin(self, func):
+ """
+ Install a callback that receives (name, type, seqid)
+ after the message header is read.
+ """
pass
diff --git a/lib/py/src/server/THttpServer.py b/lib/py/src/server/THttpServer.py
index 85cf400..47e817d 100644
--- a/lib/py/src/server/THttpServer.py
+++ b/lib/py/src/server/THttpServer.py
@@ -21,6 +21,7 @@ import ssl
from six.moves import BaseHTTPServer
+from thrift.Thrift import TMessageType
from thrift.server import TServer
from thrift.transport import TTransport
@@ -32,7 +33,9 @@ class ResponseException(Exception):
to override this behavior (e.g., to simulate a misconfigured or
overloaded web server during testing), it can raise a ResponseException.
The function passed to the constructor will be called with the
- RequestHandler as its only argument.
+ RequestHandler as its only argument. Note that this is irrelevant
+ for ONEWAY requests, as the HTTP response must be sent before the
+ RPC is processed.
"""
def __init__(self, handler):
self.handler = handler
@@ -43,6 +46,9 @@ class THttpServer(TServer.TServer):
This class is not very performant, but it is useful (for example) for
acting as a mock version of an Apache-based PHP Thrift endpoint.
+ Also important to note the HTTP implementation pretty much violates the
+ transport/protocol/processor/server layering, by performing the transport
+ functions here. This means things like oneway handling are oddly exposed.
"""
def __init__(self,
processor,
@@ -68,26 +74,45 @@ class THttpServer(TServer.TServer):
inputProtocolFactory, outputProtocolFactory)
thttpserver = self
+ self._replied = None
class RequestHander(BaseHTTPServer.BaseHTTPRequestHandler):
def do_POST(self):
# Don't care about the request path.
- itrans = TTransport.TFileObjectTransport(self.rfile)
- otrans = TTransport.TFileObjectTransport(self.wfile)
+ thttpserver._replied = False
+ iftrans = TTransport.TFileObjectTransport(self.rfile)
itrans = TTransport.TBufferedTransport(
- itrans, int(self.headers['Content-Length']))
+ iftrans, int(self.headers['Content-Length']))
otrans = TTransport.TMemoryBuffer()
iprot = thttpserver.inputProtocolFactory.getProtocol(itrans)
oprot = thttpserver.outputProtocolFactory.getProtocol(otrans)
try:
+ thttpserver.processor.on_message_begin(self.on_begin)
thttpserver.processor.process(iprot, oprot)
except ResponseException as exn:
exn.handler(self)
else:
+ if not thttpserver._replied:
+ # If the request was ONEWAY we would have replied
already
+ data = otrans.getvalue()
+ self.send_response(200)
+ self.send_header("Content-Length", len(data))
+ self.send_header("Content-Type",
"application/x-thrift")
+ self.end_headers()
+ self.wfile.write(data)
+
+ def on_begin(self, name, type, seqid):
+ """
+ Inspect the message header.
+
+ This allows us to post an immediate transport response
+ if the request is a ONEWAY message type.
+ """
+ if type == TMessageType.ONEWAY:
self.send_response(200)
- self.send_header("content-type", "application/x-thrift")
+ self.send_header("Content-Type", "application/x-thrift")
self.end_headers()
- self.wfile.write(otrans.getvalue())
+ thttpserver._replied = True
self.httpd = server_class(server_address, RequestHander)
diff --git a/test/known_failures_Linux.json b/test/known_failures_Linux.json
index dd7fb6b..5beaa58 100644
--- a/test/known_failures_Linux.json
+++ b/test/known_failures_Linux.json
@@ -452,35 +452,20 @@
"py-hs_compact_http-ip",
"py-hs_header_http-ip",
"py-hs_json_http-ip",
- "py-java_accel-binary_http-ip",
"py-java_accel-binary_http-ip-ssl",
- "py-java_accelc-compact_http-ip",
"py-java_accelc-compact_http-ip-ssl",
- "py-java_binary_http-ip",
"py-java_binary_http-ip-ssl",
- "py-java_compact_http-ip",
"py-java_compact_http-ip-ssl",
- "py-java_json_http-ip",
"py-java_json_http-ip-ssl",
- "py-java_multi-binary_http-ip",
"py-java_multi-binary_http-ip-ssl",
- "py-java_multi_http-ip",
"py-java_multi_http-ip-ssl",
- "py-java_multia-binary_http-ip",
"py-java_multia-binary_http-ip-ssl",
- "py-java_multia-multi_http-ip",
"py-java_multia-multi_http-ip-ssl",
- "py-java_multiac-compact_http-ip",
"py-java_multiac-compact_http-ip-ssl",
- "py-java_multiac-multic_http-ip",
"py-java_multiac-multic_http-ip-ssl",
- "py-java_multic-compact_http-ip",
"py-java_multic-compact_http-ip-ssl",
- "py-java_multic_http-ip",
"py-java_multic_http-ip-ssl",
- "py-java_multij-json_http-ip",
"py-java_multij-json_http-ip-ssl",
- "py-java_multij_http-ip",
"py-java_multij_http-ip-ssl",
"py-lua_accel-binary_http-ip",
"py-lua_accelc-compact_http-ip",
@@ -564,35 +549,20 @@
"py3-hs_compact_http-ip",
"py3-hs_header_http-ip",
"py3-hs_json_http-ip",
- "py3-java_accel-binary_http-ip",
"py3-java_accel-binary_http-ip-ssl",
- "py3-java_accelc-compact_http-ip",
"py3-java_accelc-compact_http-ip-ssl",
- "py3-java_binary_http-ip",
"py3-java_binary_http-ip-ssl",
- "py3-java_compact_http-ip",
"py3-java_compact_http-ip-ssl",
- "py3-java_json_http-ip",
"py3-java_json_http-ip-ssl",
- "py3-java_multi-binary_http-ip",
"py3-java_multi-binary_http-ip-ssl",
- "py3-java_multi_http-ip",
"py3-java_multi_http-ip-ssl",
- "py3-java_multia-binary_http-ip",
"py3-java_multia-binary_http-ip-ssl",
- "py3-java_multia-multi_http-ip",
"py3-java_multia-multi_http-ip-ssl",
- "py3-java_multiac-compact_http-ip",
"py3-java_multiac-compact_http-ip-ssl",
- "py3-java_multiac-multic_http-ip",
"py3-java_multiac-multic_http-ip-ssl",
- "py3-java_multic-compact_http-ip",
"py3-java_multic-compact_http-ip-ssl",
- "py3-java_multic_http-ip",
"py3-java_multic_http-ip-ssl",
- "py3-java_multij-json_http-ip",
"py3-java_multij-json_http-ip-ssl",
- "py3-java_multij_http-ip",
"py3-java_multij_http-ip-ssl",
"py3-lua_accel-binary_http-ip",
"py3-lua_accelc-compact_http-ip",
@@ -613,4 +583,4 @@
"rb-cpp_json_framed-domain",
"rb-cpp_json_framed-ip",
"rb-cpp_json_framed-ip-ssl"
-]
\ No newline at end of file
+]