Repository: trafficserver
Updated Branches:
  refs/heads/master 8c38c6c42 -> 7b8417cce


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7b8417cc/plugins/experimental/inliner/ts.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/inliner/ts.cc 
b/plugins/experimental/inliner/ts.cc
new file mode 100644
index 0000000..50d5aa8
--- /dev/null
+++ b/plugins/experimental/inliner/ts.cc
@@ -0,0 +1,471 @@
+/** @file
+
+  Inlines base64 images from the ATS cache
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#include <cstring>
+#include <iostream>
+#include <limits>
+
+#include "ts.h"
+
+namespace ats
+{
+namespace io
+{
+  IO *
+  IO::read(TSVConn v, TSCont c, const int64_t s)
+  {
+    assert(s > 0);
+    IO *io = new IO();
+    io->vio = TSVConnRead(v, c, io->buffer, s);
+    return io;
+  }
+
+  IO *
+  IO::write(TSVConn v, TSCont c, const int64_t s)
+  {
+    assert(s > 0);
+    IO *io = new IO();
+    io->vio = TSVConnWrite(v, c, io->reader, s);
+    return io;
+  }
+
+  uint64_t
+  IO::copy(const std::string &s) const
+  {
+    assert(buffer != nullptr);
+    const uint64_t size = TSIOBufferWrite(buffer, s.data(), s.size());
+    assert(size == s.size());
+    return size;
+  }
+
+  int64_t
+  IO::consume(void) const
+  {
+    assert(reader != NULL);
+    const int64_t available = TSIOBufferReaderAvail(reader);
+    if (available > 0) {
+      TSIOBufferReaderConsume(reader, available);
+    }
+    return available;
+  }
+
+  int64_t
+  IO::done(void) const
+  {
+    assert(vio != NULL);
+    assert(reader != NULL);
+    const int64_t d = TSIOBufferReaderAvail(reader) + TSVIONDoneGet(vio);
+    TSVIONDoneSet(vio, d);
+    return d;
+  }
+
+  WriteOperation::~WriteOperation()
+  {
+    assert(mutex_ != nullptr);
+    const Lock lock(mutex_);
+    TSDebug(PLUGIN_TAG, "~WriteOperation");
+
+    vio_ = nullptr;
+
+    if (action_ != nullptr) {
+      TSActionCancel(action_);
+    }
+
+    assert(reader_ != nullptr);
+    TSIOBufferReaderFree(reader_);
+
+    assert(buffer_ != nullptr);
+    TSIOBufferDestroy(buffer_);
+
+    assert(continuation_ != nullptr);
+    TSContDestroy(continuation_);
+
+    assert(vconnection_ != nullptr);
+    TSVConnShutdown(vconnection_, 0, 1);
+  }
+
+  WriteOperation::WriteOperation(const TSVConn v, const TSMutex m, const 
size_t t)
+    : vconnection_(v), buffer_(TSIOBufferCreate()), 
reader_(TSIOBufferReaderAlloc(buffer_)),
+      mutex_(m != nullptr ? m : TSMutexCreate()), 
continuation_(TSContCreate(WriteOperation::Handle, mutex_)),
+      vio_(TSVConnWrite(v, continuation_, reader_, 
std::numeric_limits<int64_t>::max())), action_(nullptr), timeout_(t), bytes_(0),
+      reenable_(true)
+  {
+    assert(vconnection_ != nullptr);
+    assert(buffer_ != nullptr);
+    assert(reader_ != nullptr);
+    assert(mutex_ != nullptr);
+    assert(continuation_ != nullptr);
+    assert(vio_ != nullptr);
+
+    if (timeout_ > 0) {
+      action_ = TSContSchedule(continuation_, timeout_, 
TS_THREAD_POOL_DEFAULT);
+      assert(action_ != nullptr);
+    }
+  }
+
+  void
+  WriteOperation::process(const size_t b)
+  {
+    assert(mutex_);
+    const Lock lock(mutex_);
+    bytes_ += b;
+    if (vio_ != nullptr && TSVIOContGet(vio_) != nullptr) {
+      if (reenable_) {
+        TSVIOReenable(vio_);
+        reenable_ = false;
+      }
+    } else {
+      vio_ = nullptr;
+    }
+  }
+
+  int
+  WriteOperation::Handle(const TSCont c, const TSEvent e, void *d)
+  {
+    assert(c != nullptr);
+    WriteOperationPointer *const p = static_cast<WriteOperationPointer 
*>(TSContDataGet(c));
+
+    if (TS_EVENT_VCONN_WRITE_COMPLETE == e) {
+      TSDebug(PLUGIN_TAG, "TS_EVENT_VCONN_WRITE_COMPLETE");
+      if (p != nullptr) {
+        TSContDataSet(c, nullptr);
+        delete p;
+      }
+      return TS_SUCCESS;
+    }
+
+    assert(p != nullptr);
+    assert(*p);
+    WriteOperation &operation = **p;
+    assert(operation.continuation_ == c);
+    assert(operation.vconnection_ != nullptr);
+    assert(d != nullptr);
+    assert(TS_EVENT_ERROR == e || TS_EVENT_TIMEOUT == e || 
TS_EVENT_VCONN_WRITE_READY == e);
+
+    switch (e) {
+    case TS_EVENT_ERROR:
+      TSError("[" PLUGIN_TAG "] TS_EVENT_ERROR from producer");
+      goto handle_error; // handle errors as timeouts
+
+    case TS_EVENT_TIMEOUT:
+      TSError("[" PLUGIN_TAG "] TS_EVENT_TIMEOUT from producer");
+
+    handle_error:
+      operation.close();
+      assert(operation.action_ != nullptr);
+      TSActionDone(operation.action_);
+      operation.action_ = nullptr;
+      /*
+      TSContDataSet(c, nullptr);
+      delete p;
+      */
+      break;
+    case TS_EVENT_VCONN_WRITE_READY:
+      operation.reenable_ = true;
+      break;
+
+    default:
+      TSError("[" PLUGIN_TAG "] Unknown event: %i", e);
+      assert(false); // UNREACHEABLE
+      break;
+    }
+
+    return TS_SUCCESS;
+  }
+
+  WriteOperationWeakPointer
+  WriteOperation::Create(const TSVConn v, const TSMutex m, const size_t t)
+  {
+    WriteOperation *const operation = new WriteOperation(v, m, t);
+    assert(operation != nullptr);
+    WriteOperationPointer *const pointer = new 
WriteOperationPointer(operation);
+    assert(pointer != nullptr);
+    TSContDataSet(operation->continuation_, pointer);
+
+#ifndef NDEBUG
+    {
+      WriteOperationPointer *const p = static_cast<WriteOperationPointer 
*>(TSContDataGet(operation->continuation_));
+      assert(pointer == p);
+      assert((*p).get() == operation);
+    }
+#endif
+
+    return WriteOperationWeakPointer(*pointer);
+  }
+
+  WriteOperation &WriteOperation::operator<<(const TSIOBufferReader r)
+  {
+    assert(r != nullptr);
+    process(TSIOBufferCopy(buffer_, r, TSIOBufferReaderAvail(r), 0));
+    return *this;
+  }
+
+  WriteOperation &WriteOperation::operator<<(const ReaderSize &r)
+  {
+    assert(r.reader != nullptr);
+    process(TSIOBufferCopy(buffer_, r.reader, r.size, r.offset));
+    return *this;
+  }
+
+  WriteOperation &WriteOperation::operator<<(const ReaderOffset &r)
+  {
+    assert(r.reader != nullptr);
+    process(TSIOBufferCopy(buffer_, r.reader, TSIOBufferReaderAvail(r.reader), 
r.offset));
+    return *this;
+  }
+
+  WriteOperation &WriteOperation::operator<<(const char *const s)
+  {
+    assert(s != nullptr);
+    process(TSIOBufferWrite(buffer_, s, strlen(s)));
+    return *this;
+  }
+
+  WriteOperation &WriteOperation::operator<<(const std::string &s)
+  {
+    process(TSIOBufferWrite(buffer_, s.data(), s.size()));
+    return *this;
+  }
+
+  void
+  WriteOperation::close(void)
+  {
+    assert(mutex_ != nullptr);
+    const Lock lock(mutex_);
+    if (vio_ != nullptr && TSVIOContGet(vio_) != nullptr) {
+      TSVIONBytesSet(vio_, bytes_);
+      TSVIOReenable(vio_);
+    }
+    vio_ = nullptr;
+  }
+
+  void
+  WriteOperation::abort(void)
+  {
+    assert(mutex_ != nullptr);
+    const Lock lock(mutex_);
+    vio_ = nullptr;
+  }
+
+  IOSink::~IOSink()
+  {
+    // TSDebug(PLUGIN_TAG, "~IOSink %p", this);
+    const WriteOperationPointer operation = operation_.lock();
+    if (operation) {
+      operation_.reset();
+      operation->close();
+    }
+  }
+
+  void
+  IOSink::process(void)
+  {
+    const WriteOperationPointer operation = operation_.lock();
+
+    if (!data_ || !operation) {
+      return;
+    }
+
+    assert(operation->mutex_ != nullptr);
+    const Lock lock(operation->mutex_);
+
+    assert(operation->buffer_ != nullptr);
+    const Node::Result result = data_->process(operation->buffer_);
+    operation->bytes_ += result.first;
+    operation->process();
+    if (result.second && data_.unique()) {
+      data_.reset();
+    }
+  }
+
+  Lock
+  IOSink::lock(void)
+  {
+    const WriteOperationPointer operation = operation_.lock();
+
+    if (!operation) {
+      return Lock();
+    }
+
+    assert(operation != nullptr);
+    assert(operation->mutex_ != nullptr);
+
+    return Lock(operation->mutex_);
+  }
+
+  void
+  IOSink::abort(void)
+  {
+    const WriteOperationPointer operation = operation_.lock();
+    if (operation) {
+      operation->abort();
+    }
+  }
+
+  BufferNode &BufferNode::operator<<(const TSIOBufferReader r)
+  {
+    assert(r != nullptr);
+    TSIOBufferCopy(buffer_, r, TSIOBufferReaderAvail(r), 0);
+    return *this;
+  }
+
+  BufferNode &BufferNode::operator<<(const ReaderSize &r)
+  {
+    assert(r.reader != nullptr);
+    TSIOBufferCopy(buffer_, r.reader, r.size, r.offset);
+    return *this;
+  }
+
+  BufferNode &BufferNode::operator<<(const ReaderOffset &r)
+  {
+    assert(r.reader != nullptr);
+    TSIOBufferCopy(buffer_, r.reader, TSIOBufferReaderAvail(r.reader), 
r.offset);
+    return *this;
+  }
+
+  BufferNode &BufferNode::operator<<(const char *const s)
+  {
+    assert(s != nullptr);
+    TSIOBufferWrite(buffer_, s, strlen(s));
+    return *this;
+  }
+
+  BufferNode &BufferNode::operator<<(const std::string &s)
+  {
+    TSIOBufferWrite(buffer_, s.data(), s.size());
+    return *this;
+  }
+
+  Node::Result
+  BufferNode::process(const TSIOBuffer b)
+  {
+    assert(b != nullptr);
+    assert(buffer_ != nullptr);
+    assert(reader_ != nullptr);
+    const size_t available = TSIOBufferReaderAvail(reader_);
+    const size_t copied = TSIOBufferCopy(b, reader_, available, 0);
+    assert(copied == available);
+    TSIOBufferReaderConsume(reader_, copied);
+    // TSDebug(PLUGIN_TAG, "BufferNode::process %lu", copied);
+    return Node::Result(copied, TSIOBufferReaderAvail(reader_) == 0);
+  }
+
+  Node::Result
+  StringNode::process(const TSIOBuffer b)
+  {
+    assert(b != nullptr);
+    const size_t copied = TSIOBufferWrite(b, string_.data(), string_.size());
+    assert(copied == string_.size());
+    return Node::Result(copied, true);
+  }
+
+  SinkPointer
+  IOSink::branch(void)
+  {
+    if (!data_) {
+      data_.reset(new Data(shared_from_this()));
+      data_->first_ = true;
+    }
+    SinkPointer pointer(new Sink(data_));
+    // TSDebug(PLUGIN_TAG, "IOSink branch %p", pointer.get());
+    return pointer;
+  }
+
+  SinkPointer
+  Sink::branch(void)
+  {
+    DataPointer data;
+    if (data_) {
+      const bool first = data_->nodes_.empty();
+      data.reset(new Data(data_->root_));
+      assert(data);
+      data_->nodes_.push_back(data);
+      assert(!data_->nodes_.empty());
+      data->first_ = first;
+    }
+    SinkPointer pointer(new Sink(data));
+    // TSDebug(PLUGIN_TAG, "Sink branch %p", pointer.get());
+    return pointer;
+  }
+
+  Sink::~Sink()
+  {
+    // TSDebug(PLUGIN_TAG, "~Sink %p", this);
+    assert(data_);
+    assert(data_.use_count() >= 1);
+    assert(data_->root_);
+    const IOSinkPointer root(std::move(data_->root_));
+    data_.reset();
+    root->process();
+  }
+
+  Node::Result
+  Data::process(const TSIOBuffer b)
+  {
+    assert(b != nullptr);
+    int64_t length = 0;
+
+    const Nodes::iterator begin = nodes_.begin(), end = nodes_.end();
+
+    Nodes::iterator it = begin;
+
+    for (; it != end; ++it) {
+      assert(*it != nullptr);
+      const Node::Result result = (*it)->process(b);
+      length += result.first;
+      if (!result.second || !it->unique()) {
+        break;
+      }
+    }
+
+    // TSDebug(PLUGIN_TAG, "Data::process %li", length);
+
+    if (begin != it) {
+      // TSDebug(PLUGIN_TAG, "Data::process::erase");
+      nodes_.erase(begin, it);
+      if (it != end) {
+        Data *data = dynamic_cast<Data *>(it->get());
+        while (data != nullptr) {
+          // TSDebug(PLUGIN_TAG, "new first");
+          data->first_ = true;
+          if (data->nodes_.empty()) {
+            break;
+          }
+          assert(data->nodes_.front());
+          data = dynamic_cast<Data *>(data->nodes_.front().get());
+        }
+      }
+    }
+
+    return Node::Result(length, nodes_.empty());
+  }
+
+  Sink &Sink::operator<<(std::string &&s)
+  {
+    if (data_) {
+      data_->nodes_.emplace_back(new StringNode(std::move(s)));
+    }
+    return *this;
+  }
+
+} // end of io namespace
+} // end of ats namespace

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7b8417cc/plugins/experimental/inliner/ts.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/inliner/ts.h 
b/plugins/experimental/inliner/ts.h
new file mode 100644
index 0000000..704c0a5
--- /dev/null
+++ b/plugins/experimental/inliner/ts.h
@@ -0,0 +1,319 @@
+/** @file
+
+  Inlines base64 images from the ATS cache
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#ifndef TS_H
+#define TS_H
+
+#include <assert.h>
+#include <limits>
+#include <list>
+#include <memory>
+#include <ts/ts.h>
+
+#ifdef NDEBUG
+#define CHECK(X) X
+#else
+#define CHECK(X)                                         \
+  {                                                      \
+    const TSReturnCode r = static_cast<TSReturnCode>(X); \
+    assert(r == TS_SUCCESS);                             \
+  }
+#endif
+
+namespace ats
+{
+namespace io
+{
+  // TODO(dmorilha): dislike this
+  struct IO {
+    TSIOBuffer buffer;
+    TSIOBufferReader reader;
+    TSVIO vio;
+
+    ~IO()
+    {
+      consume();
+      assert(reader != NULL);
+      TSIOBufferReaderFree(reader);
+      assert(buffer != NULL);
+      TSIOBufferDestroy(buffer);
+    }
+
+    IO(void) : buffer(TSIOBufferCreate()), 
reader(TSIOBufferReaderAlloc(buffer)), vio(NULL) {}
+    IO(const TSIOBuffer &b) : buffer(b), 
reader(TSIOBufferReaderAlloc(buffer)), vio(NULL) { assert(buffer != NULL); }
+    static IO *read(TSVConn, TSCont, const int64_t);
+
+    static IO *
+    read(TSVConn v, TSCont c)
+    {
+      return IO::read(v, c, std::numeric_limits<int64_t>::max());
+    }
+
+    static IO *write(TSVConn, TSCont, const int64_t);
+
+    static IO *
+    write(TSVConn v, TSCont c)
+    {
+      return IO::write(v, c, std::numeric_limits<int64_t>::max());
+    }
+
+    uint64_t copy(const std::string &) const;
+
+    int64_t consume(void) const;
+
+    int64_t done(void) const;
+  };
+
+  struct ReaderSize {
+    const TSIOBufferReader reader;
+    const size_t offset;
+    const size_t size;
+
+    ReaderSize(const TSIOBufferReader r, const size_t s, const size_t o = 0) : 
reader(r), offset(o), size(s)
+    {
+      assert(reader != NULL);
+    }
+
+    ReaderSize(const ReaderSize &) = delete;
+    ReaderSize &operator=(const ReaderSize &) = delete;
+    void *operator new(const std::size_t) throw(std::bad_alloc) = delete;
+  };
+
+  struct ReaderOffset {
+    const TSIOBufferReader reader;
+    const size_t offset;
+
+    ReaderOffset(const TSIOBufferReader r, const size_t o) : reader(r), 
offset(o) { assert(reader != NULL); }
+    ReaderOffset(const ReaderOffset &) = delete;
+    ReaderOffset &operator=(const ReaderOffset &) = delete;
+    void *operator new(const std::size_t) throw(std::bad_alloc) = delete;
+  };
+
+  struct WriteOperation;
+
+  typedef std::shared_ptr<WriteOperation> WriteOperationPointer;
+  typedef std::weak_ptr<WriteOperation> WriteOperationWeakPointer;
+
+  struct Lock {
+    const TSMutex mutex_;
+
+    ~Lock()
+    {
+      if (mutex_ != nullptr) {
+        TSMutexUnlock(mutex_);
+      }
+    }
+
+    Lock(const TSMutex m) : mutex_(m)
+    {
+      if (mutex_ != nullptr) {
+        TSMutexLock(mutex_);
+      }
+    }
+
+    Lock(void) : mutex_(nullptr) {}
+    Lock(const Lock &) = delete;
+
+    Lock(Lock &&l) : mutex_(l.mutex_) { const_cast<TSMutex &>(l.mutex_) = 
nullptr; }
+    Lock &operator=(const Lock &) = delete;
+  };
+
+  struct WriteOperation : std::enable_shared_from_this<WriteOperation> {
+    TSVConn vconnection_;
+    TSIOBuffer buffer_;
+    TSIOBufferReader reader_;
+    TSMutex mutex_;
+    TSCont continuation_;
+    TSVIO vio_;
+    TSAction action_;
+    const size_t timeout_;
+    size_t bytes_;
+    bool reenable_;
+
+    static int Handle(TSCont, TSEvent, void *);
+    static WriteOperationWeakPointer Create(const TSVConn, const TSMutex mutex 
= nullptr, const size_t timeout = 0);
+
+    ~WriteOperation();
+
+    WriteOperation(const WriteOperation &) = delete;
+    WriteOperation &operator=(const WriteOperation &) = delete;
+
+    WriteOperation &operator<<(const TSIOBufferReader);
+    WriteOperation &operator<<(const ReaderSize &);
+    WriteOperation &operator<<(const ReaderOffset &);
+    WriteOperation &operator<<(const char *const);
+    WriteOperation &operator<<(const std::string &);
+
+    void process(const size_t b = 0);
+    void close(void);
+    void abort(void);
+
+  private:
+    WriteOperation(const TSVConn, const TSMutex, const size_t);
+  };
+
+  struct Node;
+  typedef std::shared_ptr<Node> NodePointer;
+  typedef std::list<NodePointer> Nodes;
+
+  struct IOSink;
+  typedef std::shared_ptr<IOSink> IOSinkPointer;
+
+  struct Sink;
+  typedef std::shared_ptr<Sink> SinkPointer;
+
+  struct Data;
+  typedef std::shared_ptr<Data> DataPointer;
+
+  struct IOSink : std::enable_shared_from_this<IOSink> {
+    WriteOperationWeakPointer operation_;
+    DataPointer data_;
+
+    ~IOSink();
+    IOSink(const IOSink &) = delete;
+
+    IOSink &operator=(const IOSink &) = delete;
+
+    template <class T> IOSink &operator<<(T &&t)
+    {
+      const WriteOperationPointer operation = operation_.lock();
+      if (operation) {
+        const Lock lock(operation->mutex_);
+        *operation << std::forward<T>(t);
+      }
+      return *this;
+    }
+
+    template <class... A>
+    static IOSinkPointer
+    Create(A &&... a)
+    {
+      return IOSinkPointer(new 
IOSink(WriteOperation::Create(std::forward<A>(a)...)));
+    }
+
+    void process(void);
+    SinkPointer branch(void);
+    Lock lock(void);
+    void abort(void);
+
+  private:
+    IOSink(WriteOperationWeakPointer &&p) : operation_(std::move(p)) {}
+  };
+
+  struct Node {
+    typedef std::pair<size_t, bool> Result;
+    IOSinkPointer ioSink_;
+    virtual ~Node() {}
+    virtual Node::Result process(const TSIOBuffer) = 0;
+  };
+
+  struct StringNode : Node {
+    std::string string_;
+    explicit StringNode(std::string &&s) : string_(std::move(s)) {}
+    Node::Result process(const TSIOBuffer);
+  };
+
+  struct BufferNode : Node {
+    const TSIOBuffer buffer_;
+    const TSIOBufferReader reader_;
+
+    ~BufferNode()
+    {
+      assert(reader_ != nullptr);
+      TSIOBufferReaderFree(reader_);
+      assert(buffer_ != nullptr);
+      TSIOBufferDestroy(buffer_);
+    }
+
+    BufferNode(void) : buffer_(TSIOBufferCreate()), 
reader_(TSIOBufferReaderAlloc(buffer_))
+    {
+      assert(buffer_ != nullptr);
+      assert(reader_ != nullptr);
+    }
+
+    BufferNode(const BufferNode &) = delete;
+    BufferNode &operator=(const BufferNode &) = delete;
+    BufferNode &operator<<(const TSIOBufferReader);
+    BufferNode &operator<<(const ReaderSize &);
+    BufferNode &operator<<(const ReaderOffset &);
+    BufferNode &operator<<(const char *const);
+    BufferNode &operator<<(const std::string &);
+    Node::Result process(const TSIOBuffer);
+  };
+
+  struct Data : Node {
+    Nodes nodes_;
+    IOSinkPointer root_;
+    bool first_;
+
+    template <class T> Data(T &&t) : root_(std::forward<T>(t)), first_(false) 
{}
+    Data(const Data &) = delete;
+    Data &operator=(const Data &) = delete;
+
+    Node::Result process(const TSIOBuffer);
+  };
+
+  struct Sink {
+    DataPointer data_;
+
+    ~Sink();
+
+    template <class... A> Sink(A &&... a) : data_(std::forward<A>(a)...) {}
+    Sink(const Sink &) = delete;
+    Sink &operator=(const Sink &) = delete;
+
+    SinkPointer branch(void);
+
+    Sink &operator<<(std::string &&);
+
+    template <class T> Sink &operator<<(T &&t)
+    {
+      if (data_) {
+        const Lock lock = data_->root_->lock();
+        assert(data_->root_ != nullptr);
+        const bool empty = data_->nodes_.empty();
+        if (data_->first_ && empty) {
+          // TSDebug(PLUGIN_TAG, "flushing");
+          assert(data_->root_);
+          *data_->root_ << std::forward<T>(t);
+        } else {
+          // TSDebug(PLUGIN_TAG, "buffering");
+          BufferNode *buffer = nullptr;
+          if (!empty) {
+            buffer = dynamic_cast<BufferNode *>(data_->nodes_.back().get());
+          }
+          if (buffer == nullptr) {
+            data_->nodes_.emplace_back(new BufferNode());
+            buffer = reinterpret_cast<BufferNode 
*>(data_->nodes_.back().get());
+          }
+          assert(buffer != nullptr);
+          *buffer << std::forward<T>(t);
+        }
+      }
+      return *this;
+    }
+  };
+
+} // end of io namespace
+} // end of ats namespace
+
+#endif // TS_H

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7b8417cc/plugins/experimental/inliner/util.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/inliner/util.h 
b/plugins/experimental/inliner/util.h
new file mode 100644
index 0000000..2fcc42e
--- /dev/null
+++ b/plugins/experimental/inliner/util.h
@@ -0,0 +1,43 @@
+/** @file
+
+  Inlines base64 images from the ATS cache
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#ifndef UTIL_H
+#define UTIL_H
+
+#include <vector>
+
+#define DISALLOW_COPY_AND_ASSIGN(T) \
+private:                            \
+  T(const T &);                     \
+  void operator=(const T &)
+
+#define DISALLOW_IMPLICIT_CONSTRUCTORS(T) \
+private:                                  \
+  T(void);                                \
+  DISALLOW_COPY_AND_ASSIGN(T)
+
+namespace util
+{
+typedef std::vector<char> Buffer;
+} // end of util namespace
+
+#endif // UTIL_H

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7b8417cc/plugins/experimental/inliner/vconnection.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/inliner/vconnection.h 
b/plugins/experimental/inliner/vconnection.h
new file mode 100644
index 0000000..026c41f
--- /dev/null
+++ b/plugins/experimental/inliner/vconnection.h
@@ -0,0 +1,105 @@
+/** @file
+
+  Inlines base64 images from the ATS cache
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#ifndef VCONNECTION_H
+#define VCONNECTION_H
+
+#include <assert.h>
+
+#include "ts.h"
+
+namespace ats
+{
+namespace io
+{
+  namespace vconnection
+  {
+    template <class T> class Read
+    {
+      typedef Read<T> Self;
+      TSVConn vconnection_;
+      io::IO in_;
+      T t_;
+
+      Read(TSVConn v, T &&t, const int64_t s) : vconnection_(v), 
t_(std::forward<T>(t))
+      {
+        assert(vconnection_ != nullptr);
+        TSCont continuation = TSContCreate(Self::handleRead, nullptr);
+        assert(continuation != nullptr);
+        TSContDataSet(continuation, this);
+        in_.vio = TSVConnRead(vconnection_, continuation, in_.buffer, s);
+      }
+
+      static void
+      close(Self *const s)
+      {
+        assert(s != nullptr);
+        TSIOBufferReaderConsume(s->in_.reader, 
TSIOBufferReaderAvail(s->in_.reader));
+        assert(s->vconnection_ != nullptr);
+        TSVConnShutdown(s->vconnection_, 1, 1);
+        TSVConnClose(s->vconnection_);
+        delete s;
+      }
+
+      static int
+      handleRead(TSCont c, TSEvent e, void *)
+      {
+        Self *const self = static_cast<Self *const>(TSContDataGet(c));
+        assert(self != nullptr);
+        switch (e) {
+        case TS_EVENT_VCONN_EOS:
+        case TS_EVENT_VCONN_READ_COMPLETE:
+        case TS_EVENT_VCONN_READ_READY: {
+          const int64_t available = TSIOBufferReaderAvail(self->in_.reader);
+          if (available > 0) {
+            self->t_.data(self->in_.reader);
+            TSIOBufferReaderConsume(self->in_.reader, available);
+          }
+          if (e == TS_EVENT_VCONN_READ_COMPLETE || e == TS_EVENT_VCONN_EOS) {
+            self->t_.done();
+            close(self);
+            TSContDataSet(c, nullptr);
+            TSContDestroy(c);
+          }
+        } break;
+        default:
+          assert(false); // UNRECHEABLE.
+          break;
+        }
+        return TS_SUCCESS;
+      }
+
+      template <class U> friend void read(TSVConn, U &&, const int64_t);
+    };
+
+    template <class C>
+    void
+    read(TSVConn v, C &&c, const int64_t s)
+    {
+      new Read<C>(v, std::forward<C>(c), s);
+    }
+
+  } // end of vconnection namespace
+} // end of io namespace
+} // end of ats namespace
+
+#endif // VCONNECTION_H

Reply via email to