This is an automated email from the ASF dual-hosted git repository.

yuxuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git


The following commit(s) were added to refs/heads/master by this push:
     new 4db7a0a  THRIFT-5240: Do connectivity check in Go server
4db7a0a is described below

commit 4db7a0af13ac9614e3e9758d42b2791040f4dc7e
Author: Yuxuan 'fishy' Wang <[email protected]>
AuthorDate: Sat Jun 27 10:13:34 2020 -0700

    THRIFT-5240: Do connectivity check in Go server
    
    Client: go
    
    In compiler generated TProcessorFunction implementations, add a
    goroutine after read the request to do connectivity check on the input
    transport. If the transport is no longer open, cancel the context object
    passed into the handler implementation.
    
    Also define ErrAbandonRequest error, to help TSimpleServer closing
    client connections that's already closed on the other end.
---
 CHANGES.md                                         |  1 +
 compiler/cpp/src/thrift/generate/t_go_generator.cc | 69 ++++++++++++++++++++--
 lib/go/README.md                                   | 27 +++++++++
 lib/go/thrift/simple_server.go                     | 25 ++++++++
 4 files changed, 118 insertions(+), 4 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index b6c2021..be0286a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -27,6 +27,7 @@
 - [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - Add 
ClientMiddleware function type and WrapClient function to support wrapping a 
TClient with middleware functions.
 - [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - Add 
ProcessorMiddleware function type and WrapProcessor function to support 
wrapping a TProcessor with middleware functions.
 - [THRIFT-5233](https://issues.apache.org/jira/browse/THRIFT-5233) - Add 
context deadline check to ReadMessageBegin in TBinaryProtocol, 
TCompactProtocol, and THeaderProtocol.
+- [THRIFT-5240](https://issues.apache.org/jira/browse/THRIFT-5240) - The 
context passed into server handler implementations will be canceled when we 
detected that the client closed the connection.
 
 ## 0.13.0
 
diff --git a/compiler/cpp/src/thrift/generate/t_go_generator.cc 
b/compiler/cpp/src/thrift/generate/t_go_generator.cc
index 6f73819..3bb2a5c 100644
--- a/compiler/cpp/src/thrift/generate/t_go_generator.cc
+++ b/compiler/cpp/src/thrift/generate/t_go_generator.cc
@@ -919,6 +919,7 @@ string t_go_generator::go_imports_begin(bool consts) {
     system_packages.push_back("errors");
   }
   system_packages.push_back("fmt");
+  system_packages.push_back("time");
   system_packages.push_back(gen_thrift_import_);
   return "import(\n" + render_system_packages(system_packages);
 }
@@ -937,6 +938,7 @@ string t_go_generator::go_imports_end() {
       "var _ = fmt.Printf\n"
       "var _ = context.Background\n"
       "var _ = reflect.DeepEqual\n"
+      "var _ = time.Now\n"
       "var _ = bytes.Equal\n\n");
 }
 
@@ -2778,15 +2780,66 @@ void 
t_go_generator::generate_process_function(t_service* tservice, t_function*
     f_types_ << indent() << "  oprot.Flush(ctx)" << endl;
   }
   f_types_ << indent() << "  return false, err" << endl;
-  f_types_ << indent() << "}" << endl << endl;
-  f_types_ << indent() << "iprot.ReadMessageEnd(ctx)" << endl;
+  f_types_ << indent() << "}" << endl;
+  f_types_ << indent() << "iprot.ReadMessageEnd(ctx)" << endl << endl;
+
+  // Even though we never create the goroutine in oneway handlers,
+  // always have (nop) tickerCancel defined makes the writing part of code
+  // generating easier and less error-prone.
+  f_types_ << indent() << "tickerCancel := func() {}" << endl;
+  // Only create the goroutine for non-oneways.
+  if (!tfunction->is_oneway()) {
+    f_types_ << indent() << "// Start a goroutine to do server side 
connectivity check." << endl;
+    f_types_ << indent() << "if thrift.ServerConnectivityCheckInterval > 0 {" 
<< endl;
+
+    indent_up();
+    f_types_ << indent() << "var cancel context.CancelFunc" << endl;
+    f_types_ << indent() << "ctx, cancel = context.WithCancel(ctx)" << endl;
+    f_types_ << indent() << "defer cancel()" << endl;
+    f_types_ << indent() << "var tickerCtx context.Context" << endl;
+    f_types_ << indent() << "tickerCtx, tickerCancel = 
context.WithCancel(context.Background())" << endl;
+    f_types_ << indent() << "defer tickerCancel()" << endl;
+    f_types_ << indent() << "go func(ctx context.Context, cancel 
context.CancelFunc) {" << endl;
+
+    indent_up();
+    f_types_ << indent() << "ticker := 
time.NewTicker(thrift.ServerConnectivityCheckInterval)" << endl;
+    f_types_ << indent() << "defer ticker.Stop()" << endl;
+    f_types_ << indent() << "for {" << endl;
+
+    indent_up();
+    f_types_ << indent() << "select {" << endl;
+    f_types_ << indent() << "case <-ctx.Done():" << endl;
+    indent_up();
+    f_types_ << indent() << "return" << endl;
+    indent_down();
+    f_types_ << indent() << "case <-ticker.C:" << endl;
+
+    indent_up();
+    f_types_ << indent() << "if !iprot.Transport().IsOpen() {" << endl;
+    indent_up();
+    f_types_ << indent() << "cancel()" << endl;
+    f_types_ << indent() << "return" << endl;
+    indent_down();
+    f_types_ << indent() << "}" << endl;
+    indent_down();
+    f_types_ << indent() << "}" << endl;
+    indent_down();
+    f_types_ << indent() << "}" << endl;
+    indent_down();
+    f_types_ << indent() << "}(tickerCtx, cancel)" << endl;
+    indent_down();
+    f_types_ << indent() << "}" << endl << endl;
+  } else {
+    // Make sure we don't get the defined but unused compiling error.
+    f_types_ << indent() << "_ = tickerCancel" << endl << endl;
+  }
 
   if (!tfunction->is_oneway()) {
     f_types_ << indent() << "result := " << resultname << "{}" << endl;
   }
   bool need_reference = type_need_reference(tfunction->get_returntype());
   if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
-    f_types_ << "var retval " << type_to_go_type(tfunction->get_returntype()) 
<< endl;
+    f_types_ << indent() << "var retval " << 
type_to_go_type(tfunction->get_returntype()) << endl;
   }
 
   f_types_ << indent() << "var err2 error" << endl;
@@ -2818,6 +2871,7 @@ void t_go_generator::generate_process_function(t_service* 
tservice, t_function*
   }
 
   f_types_ << "); err2 != nil {" << endl;
+  f_types_ << indent() << "  tickerCancel()" << endl;
 
   t_struct* exceptions = tfunction->get_xceptions();
   const vector<t_field*>& x_fields = exceptions->get_members();
@@ -2836,6 +2890,11 @@ void 
t_go_generator::generate_process_function(t_service* tservice, t_function*
   }
 
   if (!tfunction->is_oneway()) {
+    // Avoid writing the error to the wire if it's ErrAbandonRequest
+    f_types_ << indent() << "  if err2 == thrift.ErrAbandonRequest {" << endl;
+    f_types_ << indent() << "    return false, err2" << endl;
+    f_types_ << indent() << "  }" << endl;
+
     f_types_ << indent() << "  x := 
thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "
                               "\"Internal error processing " << 
escape_string(tfunction->get_name())
                << ": \" + err2.Error())" << endl;
@@ -2864,10 +2923,11 @@ void 
t_go_generator::generate_process_function(t_service* tservice, t_function*
       }
       f_types_ << "retval" << endl;
       indent_down();
-      f_types_ << "}" << endl;
+      f_types_ << indent() << "}" << endl;
     } else {
       f_types_ << endl;
     }
+    f_types_ << indent() << "tickerCancel()" << endl;
     f_types_ << indent() << "if err2 = oprot.WriteMessageBegin(ctx, \""
                << escape_string(tfunction->get_name()) << "\", thrift.REPLY, 
seqId); err2 != nil {"
                << endl;
@@ -2889,6 +2949,7 @@ void t_go_generator::generate_process_function(t_service* 
tservice, t_function*
     f_types_ << indent() << "return true, err" << endl;
   } else {
     f_types_ << endl;
+    f_types_ << indent() << "tickerCancel()" << endl;
     f_types_ << indent() << "return true, nil" << endl;
   }
   indent_down();
diff --git a/lib/go/README.md b/lib/go/README.md
index ce6d5ed..5b7e2cd 100644
--- a/lib/go/README.md
+++ b/lib/go/README.md
@@ -81,3 +81,30 @@ which will generate:
     type Foo struct {
       Bar string `thrift:"bar,1,required" some_tag:"some_tag_value"`
     }
+
+A note about server handler implementations
+===========================================
+
+The context object passed into the server handler function will be canceled 
when
+the client closes the connection (this is a best effort check, not a guarantee
+-- there's no guarantee that the context object is always canceled when client
+closes the connection, but when it's canceled you can always assume the client
+closed the connection). When implementing Go Thrift server, you can take
+advantage of that to abandon requests that's no longer needed:
+
+    func MyEndpoint(ctx context.Context, req *thriftRequestType) 
(*thriftResponseType, error) {
+        ...
+        if ctx.Err() == context.Canceled {
+            return nil, thrift.ErrAbandonRequest
+        }
+        ...
+    }
+
+This feature would add roughly 1 millisecond of latency overhead to the server
+handlers (along with roughly 2 goroutines per request).
+If that is unacceptable, it can be disabled by having this line early in your
+main function:
+
+    thrift.ServerConnectivityCheckInterval = 0
+
+This feature is also only enabled on non-oneway endpoints.
diff --git a/lib/go/thrift/simple_server.go b/lib/go/thrift/simple_server.go
index 85baa4e..68ac394 100644
--- a/lib/go/thrift/simple_server.go
+++ b/lib/go/thrift/simple_server.go
@@ -20,12 +20,34 @@
 package thrift
 
 import (
+       "errors"
        "fmt"
        "io"
        "sync"
        "sync/atomic"
+       "time"
 )
 
+// ErrAbandonRequest is a special error server handler implementations can
+// return to indicate that the request has been abandoned.
+//
+// TSimpleServer will check for this error, and close the client connection
+// instead of writing the response/error back to the client.
+//
+// It shall only be used when the server handler implementation know that the
+// client already abandoned the request (by checking that the passed in context
+// is already canceled, for example).
+var ErrAbandonRequest = errors.New("request abandoned")
+
+// ServerConnectivityCheckInterval defines the ticker interval used by
+// connectivity check in thrift compiled TProcessorFunc implementations.
+//
+// It's defined as a variable instead of constant, so that thrift server
+// implementations can change its value to control the behavior.
+//
+// If it's changed to <=0, the feature will be disabled.
+var ServerConnectivityCheckInterval = time.Millisecond
+
 /*
  * This is not a typical TSimpleServer as it is not blocked after accept a 
socket.
  * It is more like a TThreadedServer that can handle different connections in 
different goroutines.
@@ -293,6 +315,9 @@ func (p *TSimpleServer) processRequests(client TTransport) 
(err error) {
                }
 
                ok, err := processor.Process(ctx, inputProtocol, outputProtocol)
+               if err == ErrAbandonRequest {
+                       return client.Close()
+               }
                if _, ok := err.(TTransportException); ok && err != nil {
                        return err
                }

Reply via email to