Repository: trafficserver Updated Branches: refs/heads/master 8c38c6c42 -> 7b8417cce
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7b8417cc/plugins/experimental/inliner/ts.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/inliner/ts.cc b/plugins/experimental/inliner/ts.cc new file mode 100644 index 0000000..50d5aa8 --- /dev/null +++ b/plugins/experimental/inliner/ts.cc @@ -0,0 +1,471 @@ +/** @file + + Inlines base64 images from the ATS cache + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#include <cstring> +#include <iostream> +#include <limits> + +#include "ts.h" + +namespace ats +{ +namespace io +{ + IO * + IO::read(TSVConn v, TSCont c, const int64_t s) + { + assert(s > 0); + IO *io = new IO(); + io->vio = TSVConnRead(v, c, io->buffer, s); + return io; + } + + IO * + IO::write(TSVConn v, TSCont c, const int64_t s) + { + assert(s > 0); + IO *io = new IO(); + io->vio = TSVConnWrite(v, c, io->reader, s); + return io; + } + + uint64_t + IO::copy(const std::string &s) const + { + assert(buffer != nullptr); + const uint64_t size = TSIOBufferWrite(buffer, s.data(), s.size()); + assert(size == s.size()); + return size; + } + + int64_t + IO::consume(void) const + { + assert(reader != NULL); + const int64_t available = TSIOBufferReaderAvail(reader); + if (available > 0) { + TSIOBufferReaderConsume(reader, available); + } + return available; + } + + int64_t + IO::done(void) const + { + assert(vio != NULL); + assert(reader != NULL); + const int64_t d = TSIOBufferReaderAvail(reader) + TSVIONDoneGet(vio); + TSVIONDoneSet(vio, d); + return d; + } + + WriteOperation::~WriteOperation() + { + assert(mutex_ != nullptr); + const Lock lock(mutex_); + TSDebug(PLUGIN_TAG, "~WriteOperation"); + + vio_ = nullptr; + + if (action_ != nullptr) { + TSActionCancel(action_); + } + + assert(reader_ != nullptr); + TSIOBufferReaderFree(reader_); + + assert(buffer_ != nullptr); + TSIOBufferDestroy(buffer_); + + assert(continuation_ != nullptr); + TSContDestroy(continuation_); + + assert(vconnection_ != nullptr); + TSVConnShutdown(vconnection_, 0, 1); + } + + WriteOperation::WriteOperation(const TSVConn v, const TSMutex m, const size_t t) + : vconnection_(v), buffer_(TSIOBufferCreate()), reader_(TSIOBufferReaderAlloc(buffer_)), + mutex_(m != nullptr ? m : TSMutexCreate()), continuation_(TSContCreate(WriteOperation::Handle, mutex_)), + vio_(TSVConnWrite(v, continuation_, reader_, std::numeric_limits<int64_t>::max())), action_(nullptr), timeout_(t), bytes_(0), + reenable_(true) + { + assert(vconnection_ != nullptr); + assert(buffer_ != nullptr); + assert(reader_ != nullptr); + assert(mutex_ != nullptr); + assert(continuation_ != nullptr); + assert(vio_ != nullptr); + + if (timeout_ > 0) { + action_ = TSContSchedule(continuation_, timeout_, TS_THREAD_POOL_DEFAULT); + assert(action_ != nullptr); + } + } + + void + WriteOperation::process(const size_t b) + { + assert(mutex_); + const Lock lock(mutex_); + bytes_ += b; + if (vio_ != nullptr && TSVIOContGet(vio_) != nullptr) { + if (reenable_) { + TSVIOReenable(vio_); + reenable_ = false; + } + } else { + vio_ = nullptr; + } + } + + int + WriteOperation::Handle(const TSCont c, const TSEvent e, void *d) + { + assert(c != nullptr); + WriteOperationPointer *const p = static_cast<WriteOperationPointer *>(TSContDataGet(c)); + + if (TS_EVENT_VCONN_WRITE_COMPLETE == e) { + TSDebug(PLUGIN_TAG, "TS_EVENT_VCONN_WRITE_COMPLETE"); + if (p != nullptr) { + TSContDataSet(c, nullptr); + delete p; + } + return TS_SUCCESS; + } + + assert(p != nullptr); + assert(*p); + WriteOperation &operation = **p; + assert(operation.continuation_ == c); + assert(operation.vconnection_ != nullptr); + assert(d != nullptr); + assert(TS_EVENT_ERROR == e || TS_EVENT_TIMEOUT == e || TS_EVENT_VCONN_WRITE_READY == e); + + switch (e) { + case TS_EVENT_ERROR: + TSError("[" PLUGIN_TAG "] TS_EVENT_ERROR from producer"); + goto handle_error; // handle errors as timeouts + + case TS_EVENT_TIMEOUT: + TSError("[" PLUGIN_TAG "] TS_EVENT_TIMEOUT from producer"); + + handle_error: + operation.close(); + assert(operation.action_ != nullptr); + TSActionDone(operation.action_); + operation.action_ = nullptr; + /* + TSContDataSet(c, nullptr); + delete p; + */ + break; + case TS_EVENT_VCONN_WRITE_READY: + operation.reenable_ = true; + break; + + default: + TSError("[" PLUGIN_TAG "] Unknown event: %i", e); + assert(false); // UNREACHEABLE + break; + } + + return TS_SUCCESS; + } + + WriteOperationWeakPointer + WriteOperation::Create(const TSVConn v, const TSMutex m, const size_t t) + { + WriteOperation *const operation = new WriteOperation(v, m, t); + assert(operation != nullptr); + WriteOperationPointer *const pointer = new WriteOperationPointer(operation); + assert(pointer != nullptr); + TSContDataSet(operation->continuation_, pointer); + +#ifndef NDEBUG + { + WriteOperationPointer *const p = static_cast<WriteOperationPointer *>(TSContDataGet(operation->continuation_)); + assert(pointer == p); + assert((*p).get() == operation); + } +#endif + + return WriteOperationWeakPointer(*pointer); + } + + WriteOperation &WriteOperation::operator<<(const TSIOBufferReader r) + { + assert(r != nullptr); + process(TSIOBufferCopy(buffer_, r, TSIOBufferReaderAvail(r), 0)); + return *this; + } + + WriteOperation &WriteOperation::operator<<(const ReaderSize &r) + { + assert(r.reader != nullptr); + process(TSIOBufferCopy(buffer_, r.reader, r.size, r.offset)); + return *this; + } + + WriteOperation &WriteOperation::operator<<(const ReaderOffset &r) + { + assert(r.reader != nullptr); + process(TSIOBufferCopy(buffer_, r.reader, TSIOBufferReaderAvail(r.reader), r.offset)); + return *this; + } + + WriteOperation &WriteOperation::operator<<(const char *const s) + { + assert(s != nullptr); + process(TSIOBufferWrite(buffer_, s, strlen(s))); + return *this; + } + + WriteOperation &WriteOperation::operator<<(const std::string &s) + { + process(TSIOBufferWrite(buffer_, s.data(), s.size())); + return *this; + } + + void + WriteOperation::close(void) + { + assert(mutex_ != nullptr); + const Lock lock(mutex_); + if (vio_ != nullptr && TSVIOContGet(vio_) != nullptr) { + TSVIONBytesSet(vio_, bytes_); + TSVIOReenable(vio_); + } + vio_ = nullptr; + } + + void + WriteOperation::abort(void) + { + assert(mutex_ != nullptr); + const Lock lock(mutex_); + vio_ = nullptr; + } + + IOSink::~IOSink() + { + // TSDebug(PLUGIN_TAG, "~IOSink %p", this); + const WriteOperationPointer operation = operation_.lock(); + if (operation) { + operation_.reset(); + operation->close(); + } + } + + void + IOSink::process(void) + { + const WriteOperationPointer operation = operation_.lock(); + + if (!data_ || !operation) { + return; + } + + assert(operation->mutex_ != nullptr); + const Lock lock(operation->mutex_); + + assert(operation->buffer_ != nullptr); + const Node::Result result = data_->process(operation->buffer_); + operation->bytes_ += result.first; + operation->process(); + if (result.second && data_.unique()) { + data_.reset(); + } + } + + Lock + IOSink::lock(void) + { + const WriteOperationPointer operation = operation_.lock(); + + if (!operation) { + return Lock(); + } + + assert(operation != nullptr); + assert(operation->mutex_ != nullptr); + + return Lock(operation->mutex_); + } + + void + IOSink::abort(void) + { + const WriteOperationPointer operation = operation_.lock(); + if (operation) { + operation->abort(); + } + } + + BufferNode &BufferNode::operator<<(const TSIOBufferReader r) + { + assert(r != nullptr); + TSIOBufferCopy(buffer_, r, TSIOBufferReaderAvail(r), 0); + return *this; + } + + BufferNode &BufferNode::operator<<(const ReaderSize &r) + { + assert(r.reader != nullptr); + TSIOBufferCopy(buffer_, r.reader, r.size, r.offset); + return *this; + } + + BufferNode &BufferNode::operator<<(const ReaderOffset &r) + { + assert(r.reader != nullptr); + TSIOBufferCopy(buffer_, r.reader, TSIOBufferReaderAvail(r.reader), r.offset); + return *this; + } + + BufferNode &BufferNode::operator<<(const char *const s) + { + assert(s != nullptr); + TSIOBufferWrite(buffer_, s, strlen(s)); + return *this; + } + + BufferNode &BufferNode::operator<<(const std::string &s) + { + TSIOBufferWrite(buffer_, s.data(), s.size()); + return *this; + } + + Node::Result + BufferNode::process(const TSIOBuffer b) + { + assert(b != nullptr); + assert(buffer_ != nullptr); + assert(reader_ != nullptr); + const size_t available = TSIOBufferReaderAvail(reader_); + const size_t copied = TSIOBufferCopy(b, reader_, available, 0); + assert(copied == available); + TSIOBufferReaderConsume(reader_, copied); + // TSDebug(PLUGIN_TAG, "BufferNode::process %lu", copied); + return Node::Result(copied, TSIOBufferReaderAvail(reader_) == 0); + } + + Node::Result + StringNode::process(const TSIOBuffer b) + { + assert(b != nullptr); + const size_t copied = TSIOBufferWrite(b, string_.data(), string_.size()); + assert(copied == string_.size()); + return Node::Result(copied, true); + } + + SinkPointer + IOSink::branch(void) + { + if (!data_) { + data_.reset(new Data(shared_from_this())); + data_->first_ = true; + } + SinkPointer pointer(new Sink(data_)); + // TSDebug(PLUGIN_TAG, "IOSink branch %p", pointer.get()); + return pointer; + } + + SinkPointer + Sink::branch(void) + { + DataPointer data; + if (data_) { + const bool first = data_->nodes_.empty(); + data.reset(new Data(data_->root_)); + assert(data); + data_->nodes_.push_back(data); + assert(!data_->nodes_.empty()); + data->first_ = first; + } + SinkPointer pointer(new Sink(data)); + // TSDebug(PLUGIN_TAG, "Sink branch %p", pointer.get()); + return pointer; + } + + Sink::~Sink() + { + // TSDebug(PLUGIN_TAG, "~Sink %p", this); + assert(data_); + assert(data_.use_count() >= 1); + assert(data_->root_); + const IOSinkPointer root(std::move(data_->root_)); + data_.reset(); + root->process(); + } + + Node::Result + Data::process(const TSIOBuffer b) + { + assert(b != nullptr); + int64_t length = 0; + + const Nodes::iterator begin = nodes_.begin(), end = nodes_.end(); + + Nodes::iterator it = begin; + + for (; it != end; ++it) { + assert(*it != nullptr); + const Node::Result result = (*it)->process(b); + length += result.first; + if (!result.second || !it->unique()) { + break; + } + } + + // TSDebug(PLUGIN_TAG, "Data::process %li", length); + + if (begin != it) { + // TSDebug(PLUGIN_TAG, "Data::process::erase"); + nodes_.erase(begin, it); + if (it != end) { + Data *data = dynamic_cast<Data *>(it->get()); + while (data != nullptr) { + // TSDebug(PLUGIN_TAG, "new first"); + data->first_ = true; + if (data->nodes_.empty()) { + break; + } + assert(data->nodes_.front()); + data = dynamic_cast<Data *>(data->nodes_.front().get()); + } + } + } + + return Node::Result(length, nodes_.empty()); + } + + Sink &Sink::operator<<(std::string &&s) + { + if (data_) { + data_->nodes_.emplace_back(new StringNode(std::move(s))); + } + return *this; + } + +} // end of io namespace +} // end of ats namespace http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7b8417cc/plugins/experimental/inliner/ts.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/inliner/ts.h b/plugins/experimental/inliner/ts.h new file mode 100644 index 0000000..704c0a5 --- /dev/null +++ b/plugins/experimental/inliner/ts.h @@ -0,0 +1,319 @@ +/** @file + + Inlines base64 images from the ATS cache + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#ifndef TS_H +#define TS_H + +#include <assert.h> +#include <limits> +#include <list> +#include <memory> +#include <ts/ts.h> + +#ifdef NDEBUG +#define CHECK(X) X +#else +#define CHECK(X) \ + { \ + const TSReturnCode r = static_cast<TSReturnCode>(X); \ + assert(r == TS_SUCCESS); \ + } +#endif + +namespace ats +{ +namespace io +{ + // TODO(dmorilha): dislike this + struct IO { + TSIOBuffer buffer; + TSIOBufferReader reader; + TSVIO vio; + + ~IO() + { + consume(); + assert(reader != NULL); + TSIOBufferReaderFree(reader); + assert(buffer != NULL); + TSIOBufferDestroy(buffer); + } + + IO(void) : buffer(TSIOBufferCreate()), reader(TSIOBufferReaderAlloc(buffer)), vio(NULL) {} + IO(const TSIOBuffer &b) : buffer(b), reader(TSIOBufferReaderAlloc(buffer)), vio(NULL) { assert(buffer != NULL); } + static IO *read(TSVConn, TSCont, const int64_t); + + static IO * + read(TSVConn v, TSCont c) + { + return IO::read(v, c, std::numeric_limits<int64_t>::max()); + } + + static IO *write(TSVConn, TSCont, const int64_t); + + static IO * + write(TSVConn v, TSCont c) + { + return IO::write(v, c, std::numeric_limits<int64_t>::max()); + } + + uint64_t copy(const std::string &) const; + + int64_t consume(void) const; + + int64_t done(void) const; + }; + + struct ReaderSize { + const TSIOBufferReader reader; + const size_t offset; + const size_t size; + + ReaderSize(const TSIOBufferReader r, const size_t s, const size_t o = 0) : reader(r), offset(o), size(s) + { + assert(reader != NULL); + } + + ReaderSize(const ReaderSize &) = delete; + ReaderSize &operator=(const ReaderSize &) = delete; + void *operator new(const std::size_t) throw(std::bad_alloc) = delete; + }; + + struct ReaderOffset { + const TSIOBufferReader reader; + const size_t offset; + + ReaderOffset(const TSIOBufferReader r, const size_t o) : reader(r), offset(o) { assert(reader != NULL); } + ReaderOffset(const ReaderOffset &) = delete; + ReaderOffset &operator=(const ReaderOffset &) = delete; + void *operator new(const std::size_t) throw(std::bad_alloc) = delete; + }; + + struct WriteOperation; + + typedef std::shared_ptr<WriteOperation> WriteOperationPointer; + typedef std::weak_ptr<WriteOperation> WriteOperationWeakPointer; + + struct Lock { + const TSMutex mutex_; + + ~Lock() + { + if (mutex_ != nullptr) { + TSMutexUnlock(mutex_); + } + } + + Lock(const TSMutex m) : mutex_(m) + { + if (mutex_ != nullptr) { + TSMutexLock(mutex_); + } + } + + Lock(void) : mutex_(nullptr) {} + Lock(const Lock &) = delete; + + Lock(Lock &&l) : mutex_(l.mutex_) { const_cast<TSMutex &>(l.mutex_) = nullptr; } + Lock &operator=(const Lock &) = delete; + }; + + struct WriteOperation : std::enable_shared_from_this<WriteOperation> { + TSVConn vconnection_; + TSIOBuffer buffer_; + TSIOBufferReader reader_; + TSMutex mutex_; + TSCont continuation_; + TSVIO vio_; + TSAction action_; + const size_t timeout_; + size_t bytes_; + bool reenable_; + + static int Handle(TSCont, TSEvent, void *); + static WriteOperationWeakPointer Create(const TSVConn, const TSMutex mutex = nullptr, const size_t timeout = 0); + + ~WriteOperation(); + + WriteOperation(const WriteOperation &) = delete; + WriteOperation &operator=(const WriteOperation &) = delete; + + WriteOperation &operator<<(const TSIOBufferReader); + WriteOperation &operator<<(const ReaderSize &); + WriteOperation &operator<<(const ReaderOffset &); + WriteOperation &operator<<(const char *const); + WriteOperation &operator<<(const std::string &); + + void process(const size_t b = 0); + void close(void); + void abort(void); + + private: + WriteOperation(const TSVConn, const TSMutex, const size_t); + }; + + struct Node; + typedef std::shared_ptr<Node> NodePointer; + typedef std::list<NodePointer> Nodes; + + struct IOSink; + typedef std::shared_ptr<IOSink> IOSinkPointer; + + struct Sink; + typedef std::shared_ptr<Sink> SinkPointer; + + struct Data; + typedef std::shared_ptr<Data> DataPointer; + + struct IOSink : std::enable_shared_from_this<IOSink> { + WriteOperationWeakPointer operation_; + DataPointer data_; + + ~IOSink(); + IOSink(const IOSink &) = delete; + + IOSink &operator=(const IOSink &) = delete; + + template <class T> IOSink &operator<<(T &&t) + { + const WriteOperationPointer operation = operation_.lock(); + if (operation) { + const Lock lock(operation->mutex_); + *operation << std::forward<T>(t); + } + return *this; + } + + template <class... A> + static IOSinkPointer + Create(A &&... a) + { + return IOSinkPointer(new IOSink(WriteOperation::Create(std::forward<A>(a)...))); + } + + void process(void); + SinkPointer branch(void); + Lock lock(void); + void abort(void); + + private: + IOSink(WriteOperationWeakPointer &&p) : operation_(std::move(p)) {} + }; + + struct Node { + typedef std::pair<size_t, bool> Result; + IOSinkPointer ioSink_; + virtual ~Node() {} + virtual Node::Result process(const TSIOBuffer) = 0; + }; + + struct StringNode : Node { + std::string string_; + explicit StringNode(std::string &&s) : string_(std::move(s)) {} + Node::Result process(const TSIOBuffer); + }; + + struct BufferNode : Node { + const TSIOBuffer buffer_; + const TSIOBufferReader reader_; + + ~BufferNode() + { + assert(reader_ != nullptr); + TSIOBufferReaderFree(reader_); + assert(buffer_ != nullptr); + TSIOBufferDestroy(buffer_); + } + + BufferNode(void) : buffer_(TSIOBufferCreate()), reader_(TSIOBufferReaderAlloc(buffer_)) + { + assert(buffer_ != nullptr); + assert(reader_ != nullptr); + } + + BufferNode(const BufferNode &) = delete; + BufferNode &operator=(const BufferNode &) = delete; + BufferNode &operator<<(const TSIOBufferReader); + BufferNode &operator<<(const ReaderSize &); + BufferNode &operator<<(const ReaderOffset &); + BufferNode &operator<<(const char *const); + BufferNode &operator<<(const std::string &); + Node::Result process(const TSIOBuffer); + }; + + struct Data : Node { + Nodes nodes_; + IOSinkPointer root_; + bool first_; + + template <class T> Data(T &&t) : root_(std::forward<T>(t)), first_(false) {} + Data(const Data &) = delete; + Data &operator=(const Data &) = delete; + + Node::Result process(const TSIOBuffer); + }; + + struct Sink { + DataPointer data_; + + ~Sink(); + + template <class... A> Sink(A &&... a) : data_(std::forward<A>(a)...) {} + Sink(const Sink &) = delete; + Sink &operator=(const Sink &) = delete; + + SinkPointer branch(void); + + Sink &operator<<(std::string &&); + + template <class T> Sink &operator<<(T &&t) + { + if (data_) { + const Lock lock = data_->root_->lock(); + assert(data_->root_ != nullptr); + const bool empty = data_->nodes_.empty(); + if (data_->first_ && empty) { + // TSDebug(PLUGIN_TAG, "flushing"); + assert(data_->root_); + *data_->root_ << std::forward<T>(t); + } else { + // TSDebug(PLUGIN_TAG, "buffering"); + BufferNode *buffer = nullptr; + if (!empty) { + buffer = dynamic_cast<BufferNode *>(data_->nodes_.back().get()); + } + if (buffer == nullptr) { + data_->nodes_.emplace_back(new BufferNode()); + buffer = reinterpret_cast<BufferNode *>(data_->nodes_.back().get()); + } + assert(buffer != nullptr); + *buffer << std::forward<T>(t); + } + } + return *this; + } + }; + +} // end of io namespace +} // end of ats namespace + +#endif // TS_H http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7b8417cc/plugins/experimental/inliner/util.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/inliner/util.h b/plugins/experimental/inliner/util.h new file mode 100644 index 0000000..2fcc42e --- /dev/null +++ b/plugins/experimental/inliner/util.h @@ -0,0 +1,43 @@ +/** @file + + Inlines base64 images from the ATS cache + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#ifndef UTIL_H +#define UTIL_H + +#include <vector> + +#define DISALLOW_COPY_AND_ASSIGN(T) \ +private: \ + T(const T &); \ + void operator=(const T &) + +#define DISALLOW_IMPLICIT_CONSTRUCTORS(T) \ +private: \ + T(void); \ + DISALLOW_COPY_AND_ASSIGN(T) + +namespace util +{ +typedef std::vector<char> Buffer; +} // end of util namespace + +#endif // UTIL_H http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7b8417cc/plugins/experimental/inliner/vconnection.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/inliner/vconnection.h b/plugins/experimental/inliner/vconnection.h new file mode 100644 index 0000000..026c41f --- /dev/null +++ b/plugins/experimental/inliner/vconnection.h @@ -0,0 +1,105 @@ +/** @file + + Inlines base64 images from the ATS cache + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#ifndef VCONNECTION_H +#define VCONNECTION_H + +#include <assert.h> + +#include "ts.h" + +namespace ats +{ +namespace io +{ + namespace vconnection + { + template <class T> class Read + { + typedef Read<T> Self; + TSVConn vconnection_; + io::IO in_; + T t_; + + Read(TSVConn v, T &&t, const int64_t s) : vconnection_(v), t_(std::forward<T>(t)) + { + assert(vconnection_ != nullptr); + TSCont continuation = TSContCreate(Self::handleRead, nullptr); + assert(continuation != nullptr); + TSContDataSet(continuation, this); + in_.vio = TSVConnRead(vconnection_, continuation, in_.buffer, s); + } + + static void + close(Self *const s) + { + assert(s != nullptr); + TSIOBufferReaderConsume(s->in_.reader, TSIOBufferReaderAvail(s->in_.reader)); + assert(s->vconnection_ != nullptr); + TSVConnShutdown(s->vconnection_, 1, 1); + TSVConnClose(s->vconnection_); + delete s; + } + + static int + handleRead(TSCont c, TSEvent e, void *) + { + Self *const self = static_cast<Self *const>(TSContDataGet(c)); + assert(self != nullptr); + switch (e) { + case TS_EVENT_VCONN_EOS: + case TS_EVENT_VCONN_READ_COMPLETE: + case TS_EVENT_VCONN_READ_READY: { + const int64_t available = TSIOBufferReaderAvail(self->in_.reader); + if (available > 0) { + self->t_.data(self->in_.reader); + TSIOBufferReaderConsume(self->in_.reader, available); + } + if (e == TS_EVENT_VCONN_READ_COMPLETE || e == TS_EVENT_VCONN_EOS) { + self->t_.done(); + close(self); + TSContDataSet(c, nullptr); + TSContDestroy(c); + } + } break; + default: + assert(false); // UNRECHEABLE. + break; + } + return TS_SUCCESS; + } + + template <class U> friend void read(TSVConn, U &&, const int64_t); + }; + + template <class C> + void + read(TSVConn v, C &&c, const int64_t s) + { + new Read<C>(v, std::forward<C>(c), s); + } + + } // end of vconnection namespace +} // end of io namespace +} // end of ats namespace + +#endif // VCONNECTION_H
