This is an automated email from the ASF dual-hosted git repository.

jpeach pushed a commit to branch master
in repository https://git-dual.apache.org/repos/asf/trafficserver.git

The following commit(s) were added to refs/heads/master by this push:
       new  a72ba96   TS-4932: Add a passthru example plugin.
a72ba96 is described below

commit a72ba96eae2c3833ab6832de5d0945ce990b69b1
Author: James Peach <[email protected]>
AuthorDate: Mon Oct 3 10:36:35 2016 -0700

    TS-4932: Add a passthru example plugin.
    
    This is a protocol plugin example that demonstrates how to proxy
    traffic from a custom port to an internal request using
    TSHttpConnectWithPluginId().
---
 example/Makefile.am          |   6 +-
 example/passthru/passthru.cc | 352 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 356 insertions(+), 2 deletions(-)

diff --git a/example/Makefile.am b/example/Makefile.am
index 86349d7..045b11c 100644
--- a/example/Makefile.am
+++ b/example/Makefile.am
@@ -32,6 +32,7 @@ plugins = \
   lifecycle-plugin.la \
   null-transform.la \
   output-header.la \
+  passthru.la \
   protocol-stack.la \
   protocol.la \
   psi.la \
@@ -41,6 +42,7 @@ plugins = \
   replace-header.la \
   response-header-1.la \
   secure-link.la \
+  server-push.la \
   server-transform.la \
   ssl-preaccept.la \
   ssl-sni-whitelist.la \
@@ -48,7 +50,6 @@ plugins = \
   statistic.la \
   thread-1.la \
   txn-data-sink.la \
-  server-push.la \
   version.la
 
 if BUILD_EXAMPLE_PLUGINS
@@ -70,6 +71,7 @@ intercept_la_SOURCES = intercept/intercept.cc
 lifecycle_plugin_la_SOURCES = lifecycle-plugin/lifecycle-plugin.c
 null_transform_la_SOURCES = null-transform/null-transform.c
 output_header_la_SOURCES = output-header/output-header.c
+passthru_la_SOURCES = passthru/passthru.cc
 protocol_la_SOURCES = protocol/Protocol.c protocol/TxnSM.c
 protocol_stack_la_SOURCES = protocol-stack/protocol-stack.cc
 psi_la_SOURCES = thread-pool/psi.c thread-pool/thread.c
@@ -79,6 +81,7 @@ remap_la_SOURCES = remap/remap.cc
 replace_header_la_SOURCES = replace-header/replace-header.c
 response_header_1_la_SOURCES = response-header-1/response-header-1.c
 secure_link_la_SOURCES = secure-link/secure-link.c
+server_push_la_SOURCES = server-push/server-push.c
 server_transform_la_SOURCES = server-transform/server-transform.c
 ssl_preaccept_la_SOURCES = ssl-preaccept/ssl-preaccept.cc
 ssl_sni_la_SOURCES = ssl-sni/ssl-sni.cc
@@ -87,7 +90,6 @@ statistic_la_SOURCES = statistic/statistic.cc
 thread_1_la_SOURCES = thread-1/thread-1.c
 txn_data_sink_la_SOURCES = txn-data-sink/txn-data-sink.c
 version_la_SOURCES = version/version.c
-server_push_la_SOURCES = server-push/server-push.c
 
 # The following examples do not build:
 #
diff --git a/example/passthru/passthru.cc b/example/passthru/passthru.cc
new file mode 100644
index 0000000..3a693dc
--- /dev/null
+++ b/example/passthru/passthru.cc
@@ -0,0 +1,352 @@
+/** @passthru.cc
+ *
+ *  Example protocol plugin.
+ *
+ *  @section license License
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/*
+ * Passthru plugin.
+ *
+ * This plugin demonstrates:
+ *
+ *  - Using TSMgmtStringCreate() to add custom records into records.config.
+ *  - Listening on a custom socket with TSPortDescriptorAccept().
+ *  - Using TSHttpConnectWithPluginId() and the VConn API to proxy HTTP 
traffic.
+ */
+
+#include <ts/ts.h>
+#include <inttypes.h>
+#include <string.h>
+
+#define PLUGIN_NAME "passthru"
+
+#define PassthruSessionDebug(sp, fmt, ...)                 \
+  do {                                                     \
+    TSDebug(PLUGIN_NAME, "sp=%p " fmt, sp, ##__VA_ARGS__); \
+  } while (0)
+
+static int PassthruSessionEvent(TSCont cont, TSEvent event, void *edata);
+
+union EventArgument {
+  void *edata;
+  TSVConn vconn;
+  TSVIO vio;
+
+  EventArgument(void *_p) : edata(_p) {}
+};
+
+struct PassthruIO {
+  TSVIO vio;
+  TSIOBuffer iobuf;
+  TSIOBufferReader reader;
+
+  PassthruIO() : vio(NULL), iobuf(NULL), reader(NULL) {}
+  ~PassthruIO() { clear(); }
+  void
+  clear()
+  {
+    if (this->reader) {
+      TSIOBufferReaderFree(this->reader);
+    }
+
+    if (this->iobuf) {
+      TSIOBufferDestroy(this->iobuf);
+    }
+
+    this->reader = nullptr;
+    this->iobuf  = nullptr;
+    this->vio    = nullptr;
+  }
+
+  // Start a read operation.
+  void
+  read(TSVConn vconn, TSCont contp)
+  {
+    TSReleaseAssert(this->vio == NULL);
+
+    TSReleaseAssert((this->iobuf = TSIOBufferCreate()));
+    TSReleaseAssert((this->reader = TSIOBufferReaderAlloc(this->iobuf)));
+
+    this->vio = TSVConnRead(vconn, contp, this->iobuf, INT64_MAX);
+  }
+
+  // Start a write operation.
+  void
+  write(TSVConn vconn, TSCont contp)
+  {
+    TSReleaseAssert(this->vio == NULL);
+
+    TSReleaseAssert((this->iobuf = TSIOBufferCreate()));
+    TSReleaseAssert((this->reader = TSIOBufferReaderAlloc(this->iobuf)));
+
+    this->vio = TSVConnWrite(vconn, contp, this->reader, INT64_MAX);
+  }
+
+  // Transfer data from this IO object to the target IO object.
+  int64_t
+  transfer_to(PassthruIO &to)
+  {
+    TSIOBufferBlock block;
+    int64_t consumed = 0;
+
+    for (block = TSIOBufferReaderStart(this->reader); block; block = 
TSIOBufferBlockNext(block)) {
+      int64_t remain = 0;
+      const char *bytes;
+
+      bytes = TSIOBufferBlockReadStart(block, this->reader, &remain);
+      while (bytes && remain) {
+        int64_t nwritten;
+
+        nwritten = TSIOBufferWrite(to.iobuf, bytes, remain);
+        remain -= nwritten;
+        bytes += nwritten;
+        consumed += nwritten;
+      }
+    }
+
+    if (consumed) {
+      TSIOBufferReaderConsume(this->reader, consumed);
+    }
+
+    return consumed;
+  }
+
+private:
+  PassthruIO(const PassthruIO &) = delete;
+  PassthruIO &operator=(const PassthruIO &) = delete;
+};
+
+struct PassthruSession {
+  // VC session to the client.
+  struct {
+    TSVConn vconn;
+    PassthruIO readio;
+    PassthruIO writeio;
+  } client;
+
+  // VC session to Traffic Server via TSHttpConnect.
+  struct {
+    TSVConn vconn;
+    PassthruIO readio;
+    PassthruIO writeio;
+  } server;
+
+  TSCont contp;
+
+  PassthruSession() : contp(TSContCreate(PassthruSessionEvent, 
TSMutexCreate()))
+  {
+    this->client.vconn = this->server.vconn = nullptr;
+    TSContDataSet(this->contp, this);
+  }
+
+  ~PassthruSession()
+  {
+    if (this->server.vconn) {
+      TSVConnClose(this->server.vconn);
+    }
+
+    if (this->client.vconn) {
+      TSVConnClose(this->client.vconn);
+    }
+
+    TSContDataSet(this->contp, nullptr);
+    TSContDestroy(this->contp);
+
+    PassthruSessionDebug(this, "destroyed session");
+  }
+
+private:
+  PassthruSession(const PassthruSession &) = delete;
+  PassthruSession &operator=(const PassthruSession &) = delete;
+};
+
+static bool
+PassthruSessionIsFinished(PassthruSession *sp)
+{
+  int64_t avail = TSIOBufferReaderAvail(sp->client.writeio.reader);
+
+  // We should shut down the session if we don't have a server vconn
+  // (either it was never started, or it was closed), and we have drained
+  // the client write buffer.
+  if (sp->server.vconn == nullptr && avail == 0) {
+    return true;
+  }
+
+  PassthruSessionDebug(sp, "continuing session with %" PRId64 " buffered 
client bytes", avail);
+  return false;
+}
+
+static int
+PassthruSessionEvent(TSCont cont, TSEvent event, void *edata)
+{
+  EventArgument arg(edata);
+  PassthruSession *sp = (PassthruSession *)TSContDataGet(cont);
+
+  PassthruSessionDebug(sp, "session event on vconn=%p event=%d (%s)", 
TSVIOVConnGet(arg.vio), event, TSHttpEventNameLookup(event));
+
+  if (event == TS_EVENT_VCONN_READ_READY) {
+    // On the first read, wire up the internal transfer to the server.
+    if (sp->server.vconn == nullptr) {
+      sp->server.vconn = 
TSHttpConnectWithPluginId(TSNetVConnRemoteAddrGet(sp->client.vconn), 
PLUGIN_NAME, 0);
+
+      // Start the server end of the IO before we write any data.
+      sp->server.readio.read(sp->server.vconn, sp->contp);
+      sp->server.writeio.write(sp->server.vconn, sp->contp);
+    }
+
+    if (sp->server.vconn != nullptr) {
+      int64_t nbytes;
+
+      nbytes = sp->client.readio.transfer_to(sp->server.writeio);
+      PassthruSessionDebug(sp, "proxied %" PRId64 " bytes from client vconn=%p 
to server vconn=%p", nbytes, sp->client.vconn,
+                           sp->server.vconn);
+      if (nbytes) {
+        TSVIOReenable(sp->client.readio.vio);
+        TSVIOReenable(sp->server.writeio.vio);
+      }
+
+      nbytes = sp->server.readio.transfer_to(sp->client.writeio);
+      PassthruSessionDebug(sp, "proxied %" PRId64 " bytes from server vconn=%p 
to client vconn=%p", nbytes, sp->server.vconn,
+                           sp->client.vconn);
+      if (nbytes) {
+        TSVIOReenable(sp->server.readio.vio);
+        TSVIOReenable(sp->client.writeio.vio);
+      }
+    }
+
+    if (PassthruSessionIsFinished(sp)) {
+      delete sp;
+      return TS_EVENT_NONE;
+    }
+
+    TSVIOReenable(arg.vio);
+    return TS_EVENT_NONE;
+  }
+
+  if (event == TS_EVENT_VCONN_WRITE_READY) {
+    if (PassthruSessionIsFinished(sp)) {
+      delete sp;
+      return TS_EVENT_NONE;
+    }
+
+    return TS_EVENT_NONE;
+  }
+
+  if (event == TS_EVENT_VCONN_EOS) {
+    // If we get EOS from the client, just abort everything; we don't
+    // care any more.
+    if (TSVIOVConnGet(arg.vio) == sp->client.vconn) {
+      PassthruSessionDebug(sp, "got EOS from client vconn=%p", 
sp->client.vconn);
+
+      delete sp;
+      return TS_EVENT_NONE;
+    }
+
+    // If we get EOS from the server, then we should make sure that we
+    // drain any outstanding data before shutting down the client.
+    if (TSVIOVConnGet(arg.vio) == sp->server.vconn) {
+      PassthruSessionDebug(sp, "EOS from server vconn=%p", sp->server.vconn);
+
+      TSReleaseAssert(sp->client.vconn != nullptr);
+
+      if (TSIOBufferReaderAvail(sp->server.readio.reader) > 0) {
+        sp->server.readio.transfer_to(sp->client.writeio);
+      }
+
+      TSVConnClose(sp->server.vconn);
+      sp->server.vconn = nullptr;
+      sp->server.readio.clear();
+      sp->server.writeio.clear();
+    }
+
+    return TS_EVENT_NONE;
+  }
+
+  TSError("%s: unexpected event %s (%d) edata=%p", PLUGIN_NAME, 
TSHttpEventNameLookup(event), event, arg.edata);
+
+  return TS_EVENT_ERROR;
+}
+
+static int
+PassthruAccept(TSCont /* cont */, TSEvent event, void *edata)
+{
+  EventArgument arg(edata);
+  PassthruSession *sp = new PassthruSession();
+
+  PassthruSessionDebug(sp, "accepting connection on vconn=%p event=%d", 
arg.vconn, event);
+  TSReleaseAssert(event == TS_EVENT_NET_ACCEPT);
+
+  // Start the client end of the IO. We delay starting the server end until
+  // we get the first read from the client end.
+  sp->client.vconn = arg.vconn;
+  sp->client.readio.read(arg.vconn, sp->contp);
+  sp->client.writeio.write(arg.vconn, sp->contp);
+
+  return TS_EVENT_NONE;
+}
+
+static TSReturnCode
+PassthruListen()
+{
+  TSMgmtString ports          = nullptr;
+  TSPortDescriptor descriptor = nullptr;
+  TSCont cont                 = nullptr;
+
+  if (TSMgmtStringGet("config.plugin.passthru.server_ports", &ports) == 
TS_ERROR) {
+    TSError("%s: missing config.plugin.passthru.server_ports configuration", 
PLUGIN_NAME);
+    return TS_ERROR;
+  }
+
+  descriptor = TSPortDescriptorParse(ports);
+  if (descriptor == nullptr) {
+    TSError("%s: failed to parse config.plugin.passthru.server_ports", 
PLUGIN_NAME);
+    TSfree(ports);
+    return TS_ERROR;
+  }
+
+  TSDebug(PLUGIN_NAME, "listening on port '%s'", ports);
+  TSfree(ports);
+
+  TSReleaseAssert(cont = TSContCreate(PassthruAccept, nullptr));
+  return TSPortDescriptorAccept(descriptor, cont);
+}
+
+void
+TSPluginInit(int /* argc */, const char * /* argv */ [])
+{
+  // clang-format off
+    TSPluginRegistrationInfo info = {
+        const_cast<char*>(PLUGIN_NAME),
+        const_cast<char*>("Apache Software Foundation"),
+        const_cast<char*>("[email protected]"),
+    };
+  // clang-format on
+
+  TSMgmtStringCreate(TS_RECORDTYPE_CONFIG, 
"config.plugin.passthru.server_ports", const_cast<char *>(""),
+                     TS_RECORDUPDATE_RESTART_TS, TS_RECORDCHECK_NULL, nullptr 
/* check_regex */, TS_RECORDACCESS_NULL);
+
+  // Start listening on the configured port.
+  TSReleaseAssert(PassthruListen() == TS_SUCCESS);
+
+  // Now that succeded, we can register.
+  TSReleaseAssert(TSPluginRegister(&info) == TS_SUCCESS);
+}
+
+// vim: set sw=2 ts=2 sts=2 et:

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to