Repository: trafficserver
Updated Branches:
  refs/heads/master d6950eb32 -> 993339627


TS-3961: Open source Yahoo's ats-multiplexer plug-in

This closes #302


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/99333962
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/99333962
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/99333962

Branch: refs/heads/master
Commit: 99333962777f81e0c654a685ad53603524b060ad
Parents: d6950eb
Author: Daniel Vitor Morilha <[email protected]>
Authored: Mon Oct 26 10:52:34 2015 -0700
Committer: Bryan Call <[email protected]>
Committed: Mon Oct 26 10:56:19 2015 -0700

----------------------------------------------------------------------
 NOTICE                                          |   3 +
 configure.ac                                    |   1 +
 plugins/experimental/Makefile.am                |   1 +
 plugins/experimental/multiplexer/Makefile.am    |  32 ++
 plugins/experimental/multiplexer/README         |  34 +++
 .../experimental/multiplexer/ats-multiplexer.cc | 161 ++++++++++
 .../experimental/multiplexer/chunk-decoder.cc   | 158 ++++++++++
 .../experimental/multiplexer/chunk-decoder.h    |  66 ++++
 plugins/experimental/multiplexer/dispatch.cc    | 243 +++++++++++++++
 plugins/experimental/multiplexer/dispatch.h     |  70 +++++
 plugins/experimental/multiplexer/fetcher.cc     |  64 ++++
 plugins/experimental/multiplexer/fetcher.h      | 303 +++++++++++++++++++
 .../multiplexer/original-request.cc             | 129 ++++++++
 .../experimental/multiplexer/original-request.h |  63 ++++
 plugins/experimental/multiplexer/post.cc        | 138 +++++++++
 plugins/experimental/multiplexer/post.h         |  43 +++
 plugins/experimental/multiplexer/remap.config   |  19 ++
 plugins/experimental/multiplexer/ts.cc          |  39 +++
 plugins/experimental/multiplexer/ts.h           |  67 ++++
 19 files changed, 1634 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 453af9b..a90017f 100644
--- a/NOTICE
+++ b/NOTICE
@@ -75,6 +75,9 @@ Copyright (C) 2012 Google Inc.
 collapsed_collapsing: Plugin for connection collapsing to origin
 Copyright (C) 2014 Yahoo! Inc.  All rights reserved.
 
+multiplexer: Plugin for request multiplixing
+Copyright (C) 2015 Yahoo! Inc.  All rights reserved.
+
 ~~~
 
 healthchecks: Plugin for ATS healthchecks.

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index b39e56e..1a5c7f6 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1913,6 +1913,7 @@ AS_IF([test "x$enable_experimental_plugins" = "xyes"], [
     plugins/experimental/hipes/Makefile
     plugins/experimental/memcached_remap/Makefile
     plugins/experimental/metalink/Makefile
+    plugins/experimental/multiplexer/Makefile
     plugins/experimental/mysql_remap/Makefile
     plugins/experimental/regex_revalidate/Makefile
     plugins/experimental/remap_stats/Makefile

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/Makefile.am
----------------------------------------------------------------------
diff --git a/plugins/experimental/Makefile.am b/plugins/experimental/Makefile.am
index 98f8293..0ec2e37 100644
--- a/plugins/experimental/Makefile.am
+++ b/plugins/experimental/Makefile.am
@@ -32,6 +32,7 @@ SUBDIRS = \
  healthchecks \
  hipes \
  metalink \
+ multiplexer \
  memcached_remap \
  regex_revalidate \
  remap_stats \

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/Makefile.am
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/Makefile.am 
b/plugins/experimental/multiplexer/Makefile.am
new file mode 100644
index 0000000..e93d45c
--- /dev/null
+++ b/plugins/experimental/multiplexer/Makefile.am
@@ -0,0 +1,32 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+include $(top_srcdir)/build/plugins.mk
+
+AM_CPPFLAGS += -DPLUGIN_TAG=\"multiplexer\"
+
+pkglib_LTLIBRARIES = multiplexer.la
+
+multiplexer_la_SOURCES = \
+  ats-multiplexer.cc \
+  chunk-decoder.cc \
+  dispatch.cc \
+  fetcher.cc \
+  original-request.cc \
+  post.cc \
+  ts.cc
+
+multiplexer_la_LDFLAGS = $(TS_PLUGIN_LDFLAGS)

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/README
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/README 
b/plugins/experimental/multiplexer/README
new file mode 100644
index 0000000..5681928
--- /dev/null
+++ b/plugins/experimental/multiplexer/README
@@ -0,0 +1,34 @@
+ATS (Apache Traffic Server) Multiplexer plug-in
+-----------------------------------------------
+
+This is a remap plug-in that allows a request to be multiplexed one or more 
times
+ and sent to different remap entries. Both headers and body (in case of POST or
+ PUT methods, only) are copied into the new requests.
+
+Multiplexer:
+ 1. adds "X-Multiplexer: original" header into client's request.
+ 2. copies client's request (bodies are copied by transforming the request)
+ 3. changes Host header of the copy according to pparam.
+ 4. changes X-Multiplexer header to "copy".
+ 5. sends the copied request with TSHttpConnect.
+
+Multiplexer dispatches the request in background without blocking the original
+ request. Multiplexed responses are drained and discarded.
+
+A global timeout can be overwritten through "multiplexer__timeout" environment 
variable representing how many nanoseconds to wait. A default 1s timeout is 
hard-coded.
+
+Please use "mutiplexer" tag for debugging purposes. While debugging, 
multiplexed requests and responses are printed into the logs.
+
+Multiplexer produces the following statistics consumed with traffic_line:
+ - failures: number of failed multiplexed requests
+ - hits: number of successful multiplexed requests
+ - requests: total number of multiplexed requests
+ - time(avg): average time taken between multiplexed requests and their 
responses
+ - timeouts: number of multiplexed requests which timed-out
+ - size(avg): average size of multiplexed responses
+
+Example remap.config:
+    map http://www.example.com/a http://www.example.com/ 
@plugin=multiplexer.so @pparam=host1.example.com
+    map http://www.example.com/b http://www.example.com/ 
@plugin=multiplexer.so @pparam=host2.example.com
+    map http://www.example.com/c http://www.example.com/ 
@plugin=multiplexer.so @pparam=host1.example.com @pparam=host2.example.com
+

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/ats-multiplexer.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/ats-multiplexer.cc 
b/plugins/experimental/multiplexer/ats-multiplexer.cc
new file mode 100644
index 0000000..adbfefe
--- /dev/null
+++ b/plugins/experimental/multiplexer/ats-multiplexer.cc
@@ -0,0 +1,161 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#include <algorithm>
+#include <ts/ts.h>
+#include <ts/remap.h>
+
+#include <inttypes.h>
+
+#include "dispatch.h"
+#include "fetcher.h"
+#include "original-request.h"
+#include "post.h"
+
+#ifndef PLUGIN_TAG
+#error Please define a PLUGIN_TAG before including this file.
+#endif
+
+// 1s
+const size_t DEFAULT_TIMEOUT = 1000000000000;
+
+Statistics statistics;
+
+TSReturnCode
+TSRemapInit(TSRemapInterface *, char *, int)
+{
+  {
+    timeout = 0;
+    const char *const timeoutEnv = getenv(PLUGIN_TAG "__timeout");
+    if (timeoutEnv != NULL) {
+      timeout = atol(timeoutEnv);
+    }
+    if (timeout < 1) {
+      timeout = DEFAULT_TIMEOUT;
+    }
+    TSDebug(PLUGIN_TAG, "timeout is set to: %zu", timeout);
+  }
+
+  statistics.failures = TSStatCreate(PLUGIN_TAG ".failures", 
TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_COUNT);
+
+  statistics.hits = TSStatCreate(PLUGIN_TAG ".hits", TS_RECORDDATATYPE_INT, 
TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_COUNT);
+
+  statistics.time = TSStatCreate(PLUGIN_TAG ".time", TS_RECORDDATATYPE_INT, 
TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_AVG);
+
+  statistics.requests = TSStatCreate(PLUGIN_TAG ".requests", 
TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_COUNT);
+
+  statistics.timeouts = TSStatCreate(PLUGIN_TAG ".timeouts", 
TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_COUNT);
+
+  statistics.size = TSStatCreate(PLUGIN_TAG ".size", TS_RECORDDATATYPE_INT, 
TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_AVG);
+
+  return TS_SUCCESS;
+}
+
+TSReturnCode
+TSRemapNewInstance(int argc, char **argv, void **i, char *, int)
+{
+  assert(i != NULL);
+  Instance *instance = new Instance;
+
+  if (argc > 2) {
+    std::copy(argv + 2, argv + argc, std::back_inserter(instance->origins));
+  }
+
+  *i = static_cast<void *>(instance);
+
+  return TS_SUCCESS;
+}
+
+void
+TSRemapDeleteInstance(void *i)
+{
+  assert(i != NULL);
+  delete static_cast<Instance *>(i);
+}
+
+void
+DoRemap(const Instance &i, TSHttpTxn t)
+{
+  assert(t != NULL);
+  /*
+  if (POST || PUT) {
+    transformRequest
+  }
+  */
+  TSMBuffer buffer;
+  TSMLoc location;
+
+  CHECK(TSHttpTxnClientReqGet(t, &buffer, &location));
+
+  assert(buffer != NULL);
+  assert(location != NULL);
+
+  {
+    TSMLoc field;
+
+    CHECK(TSMimeHdrFieldCreateNamed(buffer, location, "X-Multiplexer", 13, 
&field));
+    assert(field != NULL);
+
+    CHECK(TSMimeHdrFieldValueStringSet(buffer, location, field, -1, 
"original", 8));
+
+    CHECK(TSMimeHdrFieldAppend(buffer, location, field));
+  }
+
+  Requests requests;
+  generateRequests(i.origins, buffer, location, requests);
+  assert(requests.size() == i.origins.size());
+
+  int length;
+  const char *const method = TSHttpHdrMethodGet(buffer, location, &length);
+
+  TSDebug(PLUGIN_TAG, "Method is %s.", std::string(method, length).c_str());
+
+  if (length == TS_HTTP_LEN_POST && memcmp(TS_HTTP_METHOD_POST, method, 
TS_HTTP_LEN_POST) == 0) {
+    const TSVConn vconnection = TSTransformCreate(handlePost, t);
+    assert(vconnection != NULL);
+    TSContDataSet(vconnection, new PostState(requests));
+    assert(requests.empty());
+    TSHttpTxnHookAdd(t, TS_HTTP_REQUEST_TRANSFORM_HOOK, vconnection);
+  } else {
+    dispatch(requests, timeout);
+  }
+
+  TSHandleMLocRelease(buffer, TS_NULL_MLOC, location);
+
+  TSStatIntIncrement(statistics.requests, 1);
+}
+
+TSRemapStatus
+TSRemapDoRemap(void *i, TSHttpTxn t, TSRemapRequestInfo *r)
+{
+  assert(i != NULL);
+  assert(t != NULL);
+  const Instance *const instance = static_cast<const Instance *>(i);
+
+  if (!instance->origins.empty() && TSHttpTxnIsInternal(t) != TS_SUCCESS) {
+    DoRemap(*instance, t);
+  } else {
+    TSDebug(PLUGIN_TAG, "Skipping transaction %p", t);
+  }
+
+  return TSREMAP_NO_REMAP;
+}

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/chunk-decoder.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/chunk-decoder.cc 
b/plugins/experimental/multiplexer/chunk-decoder.cc
new file mode 100644
index 0000000..113cf47
--- /dev/null
+++ b/plugins/experimental/multiplexer/chunk-decoder.cc
@@ -0,0 +1,158 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#include <algorithm>
+#include <assert.h>
+
+#include "chunk-decoder.h"
+
+void
+ChunkDecoder::parseSizeCharacter(const char a)
+{
+  assert(state_ == State::kSize);
+  if (a >= '0' && a <= '9') {
+    size_ = (size_ << 4) | (a - '0');
+  } else if (a >= 'A' && a <= 'F') {
+    size_ = (size_ << 4) | (a - 'A' + 10);
+  } else if (a >= 'a' && a <= 'f') {
+    size_ = (size_ << 4) | (a - 'a' + 10);
+  } else if (a == '\r') {
+    state_ = size_ == 0 ? State::kEndN : State::kDataN;
+  } else {
+    assert(false); // invalid input
+  }
+}
+
+int
+ChunkDecoder::parseSize(const char *p, const int64_t s)
+{
+  assert(p != NULL);
+  assert(s > 0);
+  int length = 0;
+  while (state_ != State::kData && *p != '\0' && length < s) {
+    assert(state_ < State::kUpperBound); // VALID RANGE
+    switch (state_) {
+    case State::kUnknown:
+    case State::kData:
+    case State::kInvalid:
+    case State::kEnd:
+    case State::kUpperBound:
+      assert(false);
+      break;
+
+    case State::kDataN:
+      assert(*p == '\n');
+      state_ = (*p == '\n') ? State::kData : State::kInvalid;
+      break;
+
+    case State::kEndN:
+      assert(*p == '\n');
+      state_ = (*p == '\n') ? State::kEnd : State::kInvalid;
+      return length;
+
+    case State::kSizeR:
+      assert(*p == '\r');
+      state_ = (*p == '\r') ? State::kSizeN : State::kInvalid;
+      break;
+
+    case State::kSizeN:
+      assert(*p == '\n');
+      state_ = (*p == '\n') ? State::kSize : State::kInvalid;
+      break;
+
+    case State::kSize:
+      parseSizeCharacter(*p);
+      break;
+    }
+    ++length;
+    ++p;
+    assert(state_ != State::kInvalid);
+  }
+  return length;
+}
+
+bool
+ChunkDecoder::isSizeState(void) const
+{
+  return state_ == State::kDataN || state_ == State::kEndN || state_ == 
State::kSize || state_ == State::kSizeN ||
+         state_ == State::kSizeR;
+}
+
+int
+ChunkDecoder::decode(const TSIOBufferReader &r)
+{
+  assert(r != NULL);
+
+  if (state_ == State::kEnd) {
+    return 0;
+  }
+
+  {
+    const int l = TSIOBufferReaderAvail(r);
+    if (l < size_) {
+      size_ -= l;
+      return l;
+    }
+  }
+
+  int64_t size;
+  TSIOBufferBlock block = TSIOBufferReaderStart(r);
+
+  if (isSizeState()) {
+    while (block != NULL && size_ == 0) {
+      const char *p = TSIOBufferBlockReadStart(block, r, &size);
+      assert(p != NULL);
+      const int i = parseSize(p, size);
+      size -= i;
+      TSIOBufferReaderConsume(r, i);
+      if (state_ == State::kEnd) {
+        assert(size_ == 0);
+        return 0;
+      }
+      if (isSizeState()) {
+        assert(size == 0);
+        block = TSIOBufferBlockNext(block);
+      }
+    }
+  }
+
+  int length = 0;
+
+  while (block != NULL && state_ == State::kData) {
+    assert(size_ > 0);
+    const char *p = TSIOBufferBlockReadStart(block, r, &size);
+    if (p != NULL) {
+      if (size > size_) {
+        length += size_;
+        size_ = 0;
+        state_ = State::kSizeR;
+        break;
+      } else {
+        length += size;
+        size_ -= size;
+      }
+    }
+    block = TSIOBufferBlockNext(block);
+  }
+
+  return length;
+}

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/chunk-decoder.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/chunk-decoder.h 
b/plugins/experimental/multiplexer/chunk-decoder.h
new file mode 100644
index 0000000..d81a5fe
--- /dev/null
+++ b/plugins/experimental/multiplexer/chunk-decoder.h
@@ -0,0 +1,66 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#ifndef CHUNK_DECODER_H
+#define CHUNK_DECODER_H
+
+#include <ts/ts.h>
+#include <inttypes.h>
+
+class ChunkDecoder
+{
+  struct State {
+    enum STATES {
+      kUnknown,
+
+      kInvalid,
+
+      kData,
+      kDataN,
+      kEnd,
+      kEndN,
+      kSize,
+      kSizeN,
+      kSizeR,
+
+      kUpperBound,
+    };
+  };
+
+  State::STATES state_;
+  int64_t size_;
+
+public:
+  ChunkDecoder(void) : state_(State::kSize), size_(0) {}
+  void parseSizeCharacter(const char);
+  int parseSize(const char *, const int64_t);
+  int decode(const TSIOBufferReader &);
+  bool isSizeState(void) const;
+
+  inline bool
+  isEnd(void) const
+  {
+    return state_ == State::kEnd;
+  }
+};
+
+#endif // CHUNK_DECODER_H

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/dispatch.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/dispatch.cc 
b/plugins/experimental/multiplexer/dispatch.cc
new file mode 100644
index 0000000..44bb6f5
--- /dev/null
+++ b/plugins/experimental/multiplexer/dispatch.cc
@@ -0,0 +1,243 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#include <inttypes.h>
+
+#include "dispatch.h"
+#include "fetcher.h"
+#include "original-request.h"
+
+#ifndef PLUGIN_TAG
+#error Please define a PLUGIN_TAG before including this file.
+#endif
+
+extern Statistics statistics;
+
+extern size_t timeout;
+
+Request::Request(const std::string &h, const TSMBuffer b, const TSMLoc l)
+  : host(h), length(TSHttpHdrLengthGet(b, l)), io(new ats::io::IO())
+{
+  assert(!host.empty());
+  assert(b != NULL);
+  assert(l != NULL);
+  assert(io != NULL);
+  assert(length > 0);
+  TSHttpHdrPrint(b, l, io->buffer);
+  assert(length == TSIOBufferReaderAvail(io->reader));
+}
+
+uint64_t
+copy(const TSIOBufferReader &r, const TSIOBuffer b)
+{
+  assert(r != NULL);
+  assert(b != NULL);
+  TSIOBufferBlock block = TSIOBufferReaderStart(r);
+
+  uint64_t length = 0;
+
+  for (; block; block = TSIOBufferBlockNext(block)) {
+    int64_t size = 0;
+    const void *const pointer = TSIOBufferBlockReadStart(block, r, &size);
+
+    if (pointer != NULL && size > 0) {
+      const int64_t size2 = TSIOBufferWrite(b, pointer, size);
+      assert(size == size2);
+      length += size;
+    }
+  }
+
+  return length;
+}
+
+uint64_t
+read(const TSIOBufferReader &r, std::string &o, int64_t l = 0)
+{
+  assert(r != NULL);
+  TSIOBufferBlock block = TSIOBufferReaderStart(r);
+
+  assert(l >= 0);
+  if (l == 0) {
+    l = TSIOBufferReaderAvail(r);
+    assert(l >= 0);
+  }
+
+  uint64_t length = 0;
+
+  for (; block && l > 0; block = TSIOBufferBlockNext(block)) {
+    int64_t size = 0;
+    const char *const pointer = TSIOBufferBlockReadStart(block, r, &size);
+    if (pointer != NULL && size > 0) {
+      size = std::min(size, l);
+      o.append(pointer, size);
+      length += size;
+      l -= size;
+    }
+  }
+
+  return length;
+}
+
+uint64_t
+read(const TSIOBuffer &b, std::string &o, const int64_t l = 0)
+{
+  TSIOBufferReader reader = TSIOBufferReaderAlloc(b);
+  const uint64_t length = read(reader, o);
+  TSIOBufferReaderFree(reader);
+  return length;
+}
+
+class Handler
+{
+  int64_t length;
+  struct timespec start;
+  std::string response;
+
+public:
+  const std::string url;
+
+  Handler(std::string u) : length(0)
+  {
+    assert(!u.empty());
+    const_cast<std::string &>(url).swap(u);
+    clock_gettime(CLOCK_MONOTONIC, &start);
+  }
+
+  void
+  error(void)
+  {
+    TSError("[" PLUGIN_TAG "] error when communicating with \"%s\"\n", 
url.c_str());
+    TSStatIntIncrement(statistics.failures, 1);
+  }
+
+  void
+  timeout(void)
+  {
+    TSError("[" PLUGIN_TAG "] timeout when communicating with \"%s\"\n", 
url.c_str());
+    TSStatIntIncrement(statistics.timeouts, 1);
+  }
+
+  void
+  header(const TSMBuffer b, const TSMLoc l)
+  {
+    if (TSIsDebugTagSet(PLUGIN_TAG) > 0) {
+      const TSIOBuffer buffer = TSIOBufferCreate();
+      TSHttpHdrPrint(b, l, buffer);
+      std::string b;
+      read(buffer, b);
+      TSDebug(PLUGIN_TAG, "Response header for \"%s\" was:\n%s", url.c_str(), 
b.c_str());
+      TSIOBufferDestroy(buffer);
+    }
+  }
+
+  void
+  data(const TSIOBufferReader r, const int64_t l)
+  {
+    length += l;
+    if (TSIsDebugTagSet(PLUGIN_TAG) > 0) {
+      std::string buffer;
+      const uint64_t length = read(r, buffer, l);
+      response += buffer;
+      TSDebug(PLUGIN_TAG, "Receiving response chunk \"%s\" of %" PRIu64 " 
bytes", buffer.c_str(), length);
+    }
+  }
+
+  void
+  done(void)
+  {
+    struct timespec end;
+
+    clock_gettime(CLOCK_MONOTONIC, &end);
+
+    if (TSIsDebugTagSet(PLUGIN_TAG) > 0) {
+      TSDebug(PLUGIN_TAG, "Response for \"%s\" was:\n%s", url.c_str(), 
response.c_str());
+    }
+
+    const long diff = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - 
start.tv_nsec) / 1000;
+
+    TSStatIntIncrement(statistics.hits, 1);
+    TSStatIntIncrement(statistics.time, diff);
+    TSStatIntIncrement(statistics.size, length);
+  }
+};
+
+void
+generateRequests(const Origins &o, const TSMBuffer buffer, const TSMLoc 
location, Requests &r)
+{
+  assert(!o.empty());
+  assert(buffer != NULL);
+  assert(location != NULL);
+
+  Origins::const_iterator iterator = o.begin();
+  const Origins::const_iterator end = o.end();
+
+  OriginalRequest request(buffer, location);
+  request.urlScheme("");
+  request.urlHost("");
+  request.xMultiplexerHeader("copy");
+
+  for (; iterator != end; ++iterator) {
+    const std::string &host = *iterator;
+    assert(!host.empty());
+    request.hostHeader(host);
+    r.push_back(Request(host, buffer, location));
+  }
+}
+
+void
+addBody(Requests &r, const TSIOBufferReader re)
+{
+  assert(re != NULL);
+  Requests::iterator iterator = r.begin();
+  const Requests::iterator end = r.end();
+  const int64_t length = TSIOBufferReaderAvail(re);
+  if (length == 0) {
+    return;
+  }
+  assert(length > 0);
+  for (; iterator != end; ++iterator) {
+    assert(iterator->io != NULL);
+    const int64_t size = copy(re, iterator->io->buffer);
+    assert(size == length);
+    iterator->length += size;
+  }
+}
+
+void
+dispatch(Requests &r, const int t)
+{
+  Requests::iterator iterator = r.begin();
+  const Requests::iterator end = r.end();
+  for (; iterator != end; ++iterator) {
+    assert(iterator->io != NULL);
+    if (TSIsDebugTagSet(PLUGIN_TAG) > 0) {
+      TSDebug(PLUGIN_TAG, "Dispatching %i bytes to \"%s\"", iterator->length, 
iterator->host.c_str());
+      std::string b;
+      read(iterator->io->reader, b);
+      assert(b.size() == static_cast<uint64_t>(iterator->length));
+      TSDebug(PLUGIN_TAG, "%s", b.c_str());
+    }
+    ats::get(iterator->io, iterator->length, Handler(iterator->host), t);
+    // forwarding iterator->io pointer ownership
+    iterator->io = NULL;
+  }
+}

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/dispatch.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/dispatch.h 
b/plugins/experimental/multiplexer/dispatch.h
new file mode 100644
index 0000000..e152742
--- /dev/null
+++ b/plugins/experimental/multiplexer/dispatch.h
@@ -0,0 +1,70 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#ifndef DISPATCH_H
+#define DISPATCH_H
+
+#include <assert.h>
+#include <string>
+#include <ts/ts.h>
+#include <vector>
+
+#include "ts.h"
+
+#define CHECK(X)                                         \
+  {                                                      \
+    const TSReturnCode r = static_cast<TSReturnCode>(X); \
+    assert(r == TS_SUCCESS);                             \
+  }
+
+struct Statistics {
+  int failures;
+  int hits;
+  int time; // average
+  int requests;
+  int timeouts;
+  int size; // average
+};
+
+typedef std::vector<std::string> Origins;
+
+struct Request {
+  std::string host;
+  int length;
+  ats::io::IO *io;
+
+  Request(const std::string &, const TSMBuffer, const TSMLoc);
+};
+
+typedef std::vector<Request> Requests;
+
+struct Instance {
+  Origins origins;
+};
+
+extern size_t timeout;
+
+void generateRequests(const Origins &, const TSMBuffer, const TSMLoc, Requests 
&);
+void addBody(Requests &, const TSIOBufferReader);
+void dispatch(Requests &, const int timeout = 0);
+
+#endif // DISPATCH_H

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/fetcher.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/fetcher.cc 
b/plugins/experimental/multiplexer/fetcher.cc
new file mode 100644
index 0000000..7b60f1d
--- /dev/null
+++ b/plugins/experimental/multiplexer/fetcher.cc
@@ -0,0 +1,64 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#include "fetcher.h"
+
+namespace ats
+{
+void
+HttpParser::destroyParser(void)
+{
+  if (parser_ != NULL) {
+    TSHttpParserClear(parser_);
+    TSHttpParserDestroy(parser_);
+    parser_ = NULL;
+  }
+}
+
+bool
+HttpParser::parse(io::IO &io)
+{
+  if (parsed_) {
+    return true;
+  }
+  TSIOBufferBlock block = TSIOBufferReaderStart(io.reader);
+  while (block != NULL) {
+    int64_t size = 0;
+    const char *const begin = TSIOBufferBlockReadStart(block, io.reader, 
&size);
+    const char *iterator = begin;
+
+    parsed_ = (TSHttpHdrParseResp(parser_, buffer_, location_, &iterator, 
iterator + size) == TS_PARSE_DONE);
+    TSIOBufferReaderConsume(io.reader, iterator - begin);
+
+    if (parsed_) {
+      TSDebug(PLUGIN_TAG, "HttpParser: response parsing is complete (%u 
response status code)", statusCode());
+      assert(parser_ != NULL);
+      destroyParser();
+      return true;
+    }
+
+    block = TSIOBufferBlockNext(block);
+  }
+  return false;
+}
+
+} // end of ats namespace

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/fetcher.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/fetcher.h 
b/plugins/experimental/multiplexer/fetcher.h
new file mode 100644
index 0000000..dcaa93b
--- /dev/null
+++ b/plugins/experimental/multiplexer/fetcher.h
@@ -0,0 +1,303 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#ifndef NEW_FETCHER_H
+#define NEW_FETCHER_H
+
+#include <arpa/inet.h>
+#include <iostream>
+#include <limits>
+
+#include <inttypes.h>
+
+#include "chunk-decoder.h"
+#include "ts.h"
+
+#ifndef PLUGIN_TAG
+#error Please define a PLUGIN_TAG before including this file.
+#endif
+
+#define unlikely(x) __builtin_expect((x), 0)
+
+namespace ats
+{
+struct HttpParser {
+  bool parsed_;
+  TSHttpParser parser_;
+  TSMBuffer buffer_;
+  TSMLoc location_;
+
+  void destroyParser(void);
+
+  ~HttpParser()
+  {
+    TSHandleMLocRelease(buffer_, TS_NULL_MLOC, location_);
+    TSMBufferDestroy(buffer_);
+    destroyParser();
+  }
+
+  HttpParser(void) : parsed_(false), parser_(TSHttpParserCreate()), 
buffer_(TSMBufferCreate()), location_(TSHttpHdrCreate(buffer_))
+  {
+    TSHttpHdrTypeSet(buffer_, location_, TS_HTTP_TYPE_RESPONSE);
+  }
+
+  bool parse(io::IO &);
+
+  int
+  statusCode(void) const
+  {
+    return static_cast<int>(TSHttpHdrStatusGet(buffer_, location_));
+  }
+};
+
+template <class T> struct HttpTransaction {
+  typedef HttpTransaction<T> Self;
+
+  bool parsingHeaders_;
+  bool abort_;
+  bool timeout_;
+  io::IO *in_;
+  io::IO *out_;
+  TSVConn vconnection_;
+  TSCont continuation_;
+  T t_;
+  HttpParser parser_;
+  ChunkDecoder *chunkDecoder_;
+
+  ~HttpTransaction()
+  {
+    if (in_ != NULL) {
+      delete in_;
+      in_ = NULL;
+    }
+    if (out_ != NULL) {
+      delete out_;
+      out_ = NULL;
+    }
+    timeout(0);
+    assert(vconnection_ != NULL);
+    if (abort_) {
+      TSVConnAbort(vconnection_, TS_VC_CLOSE_ABORT);
+    } else {
+      TSVConnClose(vconnection_);
+    }
+    assert(continuation_ != NULL);
+    TSContDestroy(continuation_);
+    if (chunkDecoder_ != NULL) {
+      delete chunkDecoder_;
+    }
+  }
+
+  HttpTransaction(TSVConn v, TSCont c, io::IO *const i, const uint64_t l, 
const T &t)
+    : parsingHeaders_(false), abort_(false), timeout_(false), in_(NULL), 
out_(i), vconnection_(v), continuation_(c), t_(t),
+      chunkDecoder_(NULL)
+  {
+    assert(vconnection_ != NULL);
+    assert(continuation_ != NULL);
+    assert(out_ != NULL);
+    assert(l > 0);
+    out_->vio = TSVConnWrite(vconnection_, continuation_, out_->reader, l);
+  }
+
+  inline void
+  abort(const bool b = true)
+  {
+    abort_ = b;
+  }
+
+  void
+  timeout(const int64_t t)
+  {
+    assert(t >= 0);
+    assert(vconnection_ != NULL);
+    if (timeout_) {
+      TSVConnActiveTimeoutCancel(vconnection_);
+      timeout_ = false;
+    } else {
+      TSVConnActiveTimeoutSet(vconnection_, t);
+      timeout_ = true;
+    }
+  }
+
+  static void
+  close(Self *const s)
+  {
+    assert(s != NULL);
+    TSVConnShutdown(s->vconnection_, 1, 0);
+    delete s;
+  }
+
+  static bool
+  isChunkEncoding(const TSMBuffer b, const TSMLoc l)
+  {
+    assert(b != NULL);
+    assert(l != NULL);
+    bool result = false;
+    const TSMLoc field = TSMimeHdrFieldFind(b, l, 
TS_MIME_FIELD_TRANSFER_ENCODING, TS_MIME_LEN_TRANSFER_ENCODING);
+    if (field != NULL) {
+      int length;
+      const char *const value = TSMimeHdrFieldValueStringGet(b, l, field, -1, 
&length);
+      if (value != NULL && length == TS_HTTP_LEN_CHUNKED) {
+        result = strncasecmp(value, TS_HTTP_VALUE_CHUNKED, 
TS_HTTP_LEN_CHUNKED) == 0;
+      }
+      TSHandleMLocRelease(b, l, field);
+    }
+    return result;
+  }
+
+  static int
+  handle(TSCont c, TSEvent e, void *data)
+  {
+    Self *const self = static_cast<Self *const>(TSContDataGet(c));
+    assert(self != NULL);
+    switch (e) {
+    case TS_EVENT_ERROR:
+      TSDebug(PLUGIN_TAG, "HttpTransaction: ERROR");
+      self->t_.error();
+      self->abort();
+      close(self);
+      TSContDataSet(c, NULL);
+      break;
+    case TS_EVENT_VCONN_EOS:
+      TSDebug(PLUGIN_TAG, "HttpTransaction: EOS");
+      goto here;
+
+    case TS_EVENT_VCONN_READ_COMPLETE:
+      TSDebug(PLUGIN_TAG, "HttpTransaction: Read Complete");
+      goto here;
+
+    case TS_EVENT_VCONN_READ_READY:
+      TSDebug(PLUGIN_TAG, "HttpTransaction: Read");
+    here : {
+      assert(self->in_ != NULL);
+      assert(self->in_->reader != NULL);
+      assert(self->in_->vio != NULL);
+      int64_t available = TSIOBufferReaderAvail(self->in_->reader);
+      if (available > 0) {
+        TSVIONDoneSet(self->in_->vio, available + 
TSVIONDoneGet(self->in_->vio) + 2);
+        if (self->parsingHeaders_) {
+          if (self->parser_.parse(*self->in_)) {
+            if (isChunkEncoding(self->parser_.buffer_, 
self->parser_.location_)) {
+              assert(self->chunkDecoder_ == NULL);
+              self->chunkDecoder_ = new ChunkDecoder();
+            }
+            self->t_.header(self->parser_.buffer_, self->parser_.location_);
+            self->parsingHeaders_ = false;
+          }
+        }
+        if (!self->parsingHeaders_) {
+          if (self->chunkDecoder_ != NULL) {
+            available = self->chunkDecoder_->decode(self->in_->reader);
+            do {
+              self->t_.data(self->in_->reader, available);
+              TSIOBufferReaderConsume(self->in_->reader, available);
+              available = self->chunkDecoder_->decode(self->in_->reader);
+            } while (available > 0);
+          } else {
+            self->t_.data(self->in_->reader, available);
+            TSIOBufferReaderConsume(self->in_->reader, available);
+          }
+        }
+      }
+      if (e == TS_EVENT_VCONN_READ_COMPLETE || e == TS_EVENT_VCONN_EOS) {
+        self->t_.done();
+        close(self);
+        TSContDataSet(c, NULL);
+      } else if (self->chunkDecoder_ != NULL && self->chunkDecoder_->isEnd()) {
+        assert(self->parsingHeaders_ == false);
+        assert(isChunkEncoding(self->parser_.buffer_, 
self->parser_.location_));
+        self->abort();
+        self->t_.done();
+        close(self);
+        TSContDataSet(c, NULL);
+      } else {
+        TSVIOReenable(self->in_->vio);
+      }
+    } break;
+    case TS_EVENT_VCONN_WRITE_COMPLETE:
+      TSDebug(PLUGIN_TAG, "HttpTransaction: Write Complete");
+      self->parsingHeaders_ = true;
+      assert(self->in_ == NULL);
+      self->in_ = io::IO::read(self->vconnection_, c);
+      assert(self->vconnection_);
+      TSVConnShutdown(self->vconnection_, 0, 1);
+      assert(self->out_ != NULL);
+      delete self->out_;
+      self->out_ = NULL;
+      break;
+    case TS_EVENT_VCONN_WRITE_READY:
+      TSDebug(PLUGIN_TAG, "HttpTransaction: Write Ready (Done: %" PRId64 " 
Todo: %" PRId64 ")", TSVIONDoneGet(self->out_->vio),
+              TSVIONTodoGet(self->out_->vio));
+      assert(self->out_ != NULL);
+      TSVIOReenable(self->out_->vio);
+      break;
+    case 106:
+    case TS_EVENT_TIMEOUT:
+    case TS_EVENT_VCONN_INACTIVITY_TIMEOUT:
+      TSDebug(PLUGIN_TAG, "HttpTransaction: Timeout");
+      self->t_.timeout();
+      self->abort();
+      close(self);
+      TSContDataSet(c, NULL);
+      break;
+
+    default:
+      assert(false); // UNRECHEABLE.
+    }
+    return 0;
+  }
+};
+
+template <class T>
+bool
+get(const std::string &a, io::IO *const i, const int64_t l, const T &t, const 
int64_t ti = 0)
+{
+  typedef HttpTransaction<T> Transaction;
+  struct sockaddr_in socket;
+  socket.sin_family = AF_INET;
+  socket.sin_port = 80;
+  if (!inet_pton(AF_INET, a.c_str(), &socket.sin_addr)) {
+    TSDebug(PLUGIN_TAG, "ats::get Invalid address provided \"%s\".", 
a.c_str());
+    return false;
+  }
+  TSVConn vconn = TSHttpConnect(reinterpret_cast<sockaddr *>(&socket));
+  assert(vconn != NULL);
+  TSCont contp = TSContCreate(Transaction::handle, NULL);
+  assert(contp != NULL);
+  Transaction *transaction = new Transaction(vconn, contp, i, l, t);
+  TSContDataSet(contp, transaction);
+  if (ti > 0) {
+    TSDebug(PLUGIN_TAG, "ats::get Setting active timeout to: %zu", ti);
+    transaction->timeout(ti);
+  }
+  return true;
+}
+
+template <class T>
+bool
+get(io::IO *const i, const int64_t l, const T &t, const int64_t ti = 0)
+{
+  return get("127.0.0.1", i, l, t, ti);
+}
+} // end of ats namespace
+
+#endif // NEW_FETCHER_H

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/original-request.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/original-request.cc 
b/plugins/experimental/multiplexer/original-request.cc
new file mode 100644
index 0000000..3ea76f9
--- /dev/null
+++ b/plugins/experimental/multiplexer/original-request.cc
@@ -0,0 +1,129 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#include <cstring>
+
+#include "dispatch.h"
+#include "original-request.h"
+
+template <class T>
+std::string
+get(const TSMBuffer &b, const TSMLoc &l, const T &t)
+{
+  int length = 0;
+  const char *const buffer = t(b, l, &length);
+
+  assert(buffer != NULL);
+  assert(length > 0);
+  assert(strlen(buffer) >= static_cast<unsigned int>(length));
+
+  return std::string(buffer, length);
+}
+
+std::string
+get(const TSMBuffer &b, const TSMLoc &l, const TSMLoc &f, const int i = 0)
+{
+  int length = 0;
+  const char *const buffer = TSMimeHdrFieldValueStringGet(b, l, f, i, &length);
+
+  assert(buffer != NULL);
+  assert(length > 0);
+  assert(strlen(buffer) >= static_cast<unsigned int>(length));
+
+  return std::string(buffer, length);
+}
+
+OriginalRequest::OriginalRequest(const TSMBuffer b, const TSMLoc l) : 
buffer_(b), location_(l)
+{
+  CHECK(TSHttpHdrUrlGet(b, l, &url_));
+
+  assert(url_ != NULL);
+
+  const_cast<std::string &>(original.urlScheme) = get(buffer_, url_, 
TSUrlSchemeGet);
+  const_cast<std::string &>(original.urlHost) = get(buffer_, url_, 
TSUrlHostGet);
+  // TODO(dmorilha): handle port
+
+  /*
+   * this code assumes the request has a single Host header
+   */
+  hostHeader_ = TSMimeHdrFieldFind(b, l, TS_MIME_FIELD_HOST, TS_MIME_LEN_HOST);
+  assert(hostHeader_ != NULL);
+
+  const_cast<std::string &>(original.hostHeader) = get(buffer_, location_, 
hostHeader_);
+
+  xMultiplexerHeader_ = TSMimeHdrFieldFind(b, l, "X-Multiplexer", 13);
+
+  if (xMultiplexerHeader_ != NULL) {
+    const_cast<std::string &>(original.xMultiplexerHeader) = get(buffer_, 
location_, xMultiplexerHeader_);
+  }
+}
+
+OriginalRequest::~OriginalRequest(void)
+{
+  urlScheme(original.urlScheme);
+  urlHost(original.urlHost);
+  hostHeader(original.hostHeader);
+  if (!original.xMultiplexerHeader.empty()) {
+    xMultiplexerHeader(original.xMultiplexerHeader);
+  }
+
+  TSHandleMLocRelease(buffer_, location_, hostHeader_);
+  TSHandleMLocRelease(buffer_, location_, url_);
+}
+
+void
+OriginalRequest::urlScheme(const std::string &s)
+{
+  assert(buffer_ != NULL);
+  assert(url_ != NULL);
+  const TSReturnCode result = TSUrlSchemeSet(buffer_, url_, s.c_str(), 
s.size());
+  assert(result == TS_SUCCESS);
+}
+
+void
+OriginalRequest::urlHost(const std::string &s)
+{
+  assert(buffer_ != NULL);
+  assert(url_ != NULL);
+  CHECK(TSUrlHostSet(buffer_, url_, s.c_str(), s.size()));
+}
+
+void
+OriginalRequest::hostHeader(const std::string &s)
+{
+  assert(buffer_ != NULL);
+  assert(location_ != NULL);
+  assert(hostHeader_ != NULL);
+  CHECK(TSMimeHdrFieldValueStringSet(buffer_, location_, hostHeader_, 0, 
s.c_str(), s.size()));
+}
+
+bool
+OriginalRequest::xMultiplexerHeader(const std::string &s)
+{
+  assert(buffer_ != NULL);
+  assert(location_ != NULL);
+  if (xMultiplexerHeader_ == NULL) {
+    return false;
+  }
+  CHECK(TSMimeHdrFieldValueStringSet(buffer_, location_, xMultiplexerHeader_, 
0, s.c_str(), s.size()));
+  return true;
+}

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/original-request.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/original-request.h 
b/plugins/experimental/multiplexer/original-request.h
new file mode 100644
index 0000000..3814267
--- /dev/null
+++ b/plugins/experimental/multiplexer/original-request.h
@@ -0,0 +1,63 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#ifndef ORIGINAL_REQUEST_H
+#define ORIGINAL_REQUEST_H
+
+#include <assert.h>
+#include <string>
+#include <ts/ts.h>
+
+/*
+ * on dispatch we get one parsed request.
+ * So we want to alter and modify it back the way it was originally.
+ */
+class OriginalRequest
+{
+  TSMBuffer buffer_;
+  TSMLoc location_;
+  TSMLoc url_;
+  TSMLoc hostHeader_;
+  TSMLoc xMultiplexerHeader_;
+
+  OriginalRequest(const OriginalRequest &);
+  OriginalRequest &operator=(const OriginalRequest &);
+
+public:
+  struct {
+    const std::string hostHeader;
+    const std::string urlHost;
+    const std::string urlScheme;
+    const std::string xMultiplexerHeader;
+  } original;
+
+  ~OriginalRequest();
+
+  OriginalRequest(const TSMBuffer, const TSMLoc);
+
+  void urlScheme(const std::string &);
+  void urlHost(const std::string &);
+  void hostHeader(const std::string &);
+  bool xMultiplexerHeader(const std::string &);
+};
+
+#endif // ORIGINAL_REQUEST_H

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/post.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/post.cc 
b/plugins/experimental/multiplexer/post.cc
new file mode 100644
index 0000000..87701d4
--- /dev/null
+++ b/plugins/experimental/multiplexer/post.cc
@@ -0,0 +1,138 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#include <assert.h>
+#include <limits>
+
+#include "post.h"
+
+#ifndef PLUGIN_TAG
+#error Please define a PLUGIN_TAG before including this file.
+#endif
+
+PostState::~PostState()
+{
+  if (buffer != NULL) {
+    TSIOBufferDestroy(buffer);
+    buffer = NULL;
+  }
+}
+
+PostState::PostState(Requests &r) : buffer(NULL), reader(NULL), vio(NULL)
+{
+  assert(!r.empty());
+  requests.swap(r);
+}
+
+static void
+postTransform(const TSCont c, PostState &s)
+{
+  assert(c != NULL);
+
+  const TSVConn vconnection = TSTransformOutputVConnGet(c);
+  assert(vconnection != NULL);
+
+  const TSVIO vio = TSVConnWriteVIOGet(c);
+  assert(vio != NULL);
+
+  if (!s.buffer) {
+    s.buffer = TSIOBufferCreate();
+    assert(s.buffer != NULL);
+
+    const TSIOBufferReader reader = TSIOBufferReaderAlloc(s.buffer);
+    assert(reader != NULL);
+
+    s.reader = TSIOBufferReaderClone(reader);
+    assert(s.reader != NULL);
+
+    s.vio = TSVConnWrite(vconnection, c, reader, 
std::numeric_limits<int64_t>::max());
+    assert(s.vio != NULL);
+  }
+
+  if (!TSVIOBufferGet(vio)) {
+    TSVIONBytesSet(s.vio, TSVIONDoneGet(vio));
+    TSVIOReenable(s.vio);
+    return;
+  }
+
+  int64_t toWrite = TSVIONTodoGet(vio);
+  assert(toWrite >= 0);
+
+  if (toWrite > 0) {
+    toWrite = std::min(toWrite, TSIOBufferReaderAvail(TSVIOReaderGet(vio)));
+    assert(toWrite >= 0);
+
+    if (toWrite > 0) {
+      TSIOBufferCopy(TSVIOBufferGet(s.vio), TSVIOReaderGet(vio), toWrite, 0);
+      TSIOBufferReaderConsume(TSVIOReaderGet(vio), toWrite);
+      TSVIONDoneSet(vio, TSVIONDoneGet(vio) + toWrite);
+    }
+  }
+
+  if (TSVIONTodoGet(vio) > 0) {
+    if (toWrite > 0) {
+      TSVIOReenable(s.vio);
+      CHECK(TSContCall(TSVIOContGet(vio), TS_EVENT_VCONN_WRITE_READY, vio));
+    }
+  } else {
+    TSVIONBytesSet(s.vio, TSVIONDoneGet(vio));
+    TSVIOReenable(s.vio);
+    CHECK(TSContCall(TSVIOContGet(vio), TS_EVENT_VCONN_WRITE_COMPLETE, vio));
+  }
+}
+
+int
+handlePost(TSCont c, TSEvent e, void *data)
+{
+  assert(c != NULL);
+  // TODO(dmorilha): assert on possible events.
+  PostState *const state = static_cast<PostState *>(TSContDataGet(c));
+  assert(state != NULL);
+  if (TSVConnClosedGet(c)) {
+    assert(data != NULL);
+    if (state->reader != NULL) {
+      addBody(state->requests, state->reader);
+    }
+    dispatch(state->requests, timeout);
+    delete state;
+    TSContDataSet(c, NULL);
+    TSContDestroy(c);
+    return 0;
+  } else {
+    switch (e) {
+    case TS_EVENT_ERROR: {
+      const TSVIO vio = TSVConnWriteVIOGet(c);
+      assert(vio != NULL);
+      CHECK(TSContCall(TSVIOContGet(vio), TS_EVENT_ERROR, vio));
+    } break;
+    case TS_EVENT_VCONN_WRITE_COMPLETE:
+      TSVConnShutdown(TSTransformOutputVConnGet(c), 0, 1);
+      break;
+
+    case TS_EVENT_VCONN_WRITE_READY:
+    default:
+      postTransform(c, *state);
+      break;
+    }
+  }
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/post.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/post.h 
b/plugins/experimental/multiplexer/post.h
new file mode 100644
index 0000000..dfc7be5
--- /dev/null
+++ b/plugins/experimental/multiplexer/post.h
@@ -0,0 +1,43 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#ifndef POST_H
+#define POST_H
+
+#include <ts/ts.h>
+
+#include "dispatch.h"
+
+struct PostState {
+  Requests requests;
+
+  TSIOBuffer buffer;
+  TSIOBufferReader reader;
+  TSVIO vio;
+
+  ~PostState();
+  PostState(Requests &);
+};
+
+int handlePost(TSCont, TSEvent, void *);
+
+#endif // POST_H

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/remap.config
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/remap.config 
b/plugins/experimental/multiplexer/remap.config
new file mode 100644
index 0000000..7e453ad
--- /dev/null
+++ b/plugins/experimental/multiplexer/remap.config
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+map http://localhost http://localhost:8181 @plugin=ats-multiplexer.so 
@pparam=host1 @pparam=host2
+map http://host1 http://localhost:8181
+map http://host2 http://localhost:8181

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/ts.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/ts.cc 
b/plugins/experimental/multiplexer/ts.cc
new file mode 100644
index 0000000..0a53cc9
--- /dev/null
+++ b/plugins/experimental/multiplexer/ts.cc
@@ -0,0 +1,39 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#include "ts.h"
+
+namespace ats
+{
+namespace io
+{
+  IO *
+  IO::read(TSVConn v, TSCont c, const int64_t s)
+  {
+    assert(s > 0);
+    IO *io = new IO();
+    io->vio = TSVConnRead(v, c, io->buffer, s);
+    return io;
+  }
+
+} // end of io namespace
+} // end of ats namespace

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/ts.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/multiplexer/ts.h 
b/plugins/experimental/multiplexer/ts.h
new file mode 100644
index 0000000..d5dfdf6
--- /dev/null
+++ b/plugins/experimental/multiplexer/ts.h
@@ -0,0 +1,67 @@
+/** @file
+
+  Multiplexes request to other origins.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#ifndef TS_H
+#define TS_H
+#include <assert.h>
+#include <cstring>
+#include <limits>
+#include <list>
+#include <memory>
+#include <string>
+#include <ts/ts.h>
+
+namespace ats
+{
+namespace io
+{
+  struct IO {
+    TSIOBuffer buffer;
+    TSIOBufferReader reader;
+    TSVIO vio;
+
+    ~IO()
+    {
+      assert(buffer != NULL);
+      assert(reader != NULL);
+      const int64_t available = TSIOBufferReaderAvail(reader);
+      if (available > 0) {
+        TSIOBufferReaderConsume(reader, available);
+      }
+      TSIOBufferReaderFree(reader);
+      TSIOBufferDestroy(buffer);
+    }
+
+    IO(void) : buffer(TSIOBufferCreate()), 
reader(TSIOBufferReaderAlloc(buffer)), vio(NULL) {}
+    IO(const TSIOBuffer &b) : buffer(b), 
reader(TSIOBufferReaderAlloc(buffer)), vio(NULL) { assert(buffer != NULL); }
+    static IO *read(TSVConn, TSCont, const int64_t);
+
+    static IO *
+    read(TSVConn v, TSCont c)
+    {
+      return IO::read(v, c, std::numeric_limits<int64_t>::max());
+    }
+  };
+
+} // end of io namespace
+} // end of ats namespace
+#endif // TS_H

Reply via email to