http://git-wip-us.apache.org/repos/asf/qpid-site/blob/59433afd/content/releases/qpid-proton-0.13.0/proton/cpp/api/mt_2epoll_container_8cpp-example.html ---------------------------------------------------------------------- diff --git a/content/releases/qpid-proton-0.13.0/proton/cpp/api/mt_2epoll_container_8cpp-example.html b/content/releases/qpid-proton-0.13.0/proton/cpp/api/mt_2epoll_container_8cpp-example.html index 70b42a9..806e954 100755 --- a/content/releases/qpid-proton-0.13.0/proton/cpp/api/mt_2epoll_container_8cpp-example.html +++ b/content/releases/qpid-proton-0.13.0/proton/cpp/api/mt_2epoll_container_8cpp-example.html @@ -3,7 +3,7 @@ <head> <meta http-equiv="Content-Type" content="text/xhtml;charset=UTF-8"/> <meta http-equiv="X-UA-Compatible" content="IE=9"/> -<meta name="generator" content="Doxygen 1.8.10"/> +<meta name="generator" content="Doxygen 1.8.11"/> <title>Qpid Proton C++ API: mt/epoll_container.cpp</title> <link href="tabs.css" rel="stylesheet" type="text/css"/> <script type="text/javascript" src="jquery.js"></script> @@ -55,7 +55,7 @@ </table> </div> <!-- end header part --> -<!-- Generated by Doxygen 1.8.10 --> +<!-- Generated by Doxygen 1.8.11 --> <script type="text/javascript"> var searchBox = new SearchBox("searchBox", "search",false,'Search'); </script> @@ -94,539 +94,14 @@ $(document).ready(function(){initNavTree('mt_2epoll_container_8cpp-example.html' </div><!--header--> <div class="contents"> <p>An example implementation of the <a class="el" href="classproton_1_1container.html" title="A top-level container of connections, sessions, senders, and receivers. ">proton::container</a> API that shows how to use the <a class="el" href="classproton_1_1io_1_1connection__engine.html" title="Experimental - An AMQP protocol engine for a single connection. ">proton::io::connection_engine</a> SPI to adapt the proton API to native IO, in this case using a multithreaded Linux epoll poller as the implementation.<b>Requires C++11</b></p> -<div class="fragment"><div class="line"><span class="comment">/*</span></div> -<div class="line"><span class="comment"> * Licensed to the Apache Software Foundation (ASF) under one</span></div> -<div class="line"><span class="comment"> * or more contributor license agreements. See the NOTICE file</span></div> -<div class="line"><span class="comment"> * distributed with this work for additional information</span></div> -<div class="line"><span class="comment"> * regarding copyright ownership. The ASF licenses this file</span></div> -<div class="line"><span class="comment"> * to you under the Apache License, Version 2.0 (the</span></div> -<div class="line"><span class="comment"> * "License"); you may not use this file except in compliance</span></div> -<div class="line"><span class="comment"> * with the License. You may obtain a copy of the License at</span></div> -<div class="line"><span class="comment"> *</span></div> -<div class="line"><span class="comment"> * http://www.apache.org/licenses/LICENSE-2.0</span></div> -<div class="line"><span class="comment"> *</span></div> -<div class="line"><span class="comment"> * Unless required by applicable law or agreed to in writing,</span></div> -<div class="line"><span class="comment"> * software distributed under the License is distributed on an</span></div> -<div class="line"><span class="comment"> * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span></div> -<div class="line"><span class="comment"> * KIND, either express or implied. See the License for the</span></div> -<div class="line"><span class="comment"> * specific language governing permissions and limitations</span></div> -<div class="line"><span class="comment"> * under the License.</span></div> -<div class="line"><span class="comment"> */</span></div> -<div class="line"></div> -<div class="line"><span class="preprocessor">#include "mt_container.hpp"</span></div> -<div class="line"></div> -<div class="line"><span class="preprocessor">#include <proton/default_container.hpp></span></div> -<div class="line"><span class="preprocessor">#include <proton/event_loop.hpp></span></div> -<div class="line"><span class="preprocessor">#include <proton/listen_handler.hpp></span></div> -<div class="line"><span class="preprocessor">#include <proton/url.hpp></span></div> -<div class="line"></div> -<div class="line"><span class="preprocessor">#include <proton/io/container_impl_base.hpp></span></div> -<div class="line"><span class="preprocessor">#include <proton/io/connection_engine.hpp></span></div> -<div class="line"></div> -<div class="line"><span class="preprocessor">#include <atomic></span></div> -<div class="line"><span class="preprocessor">#include <memory></span></div> -<div class="line"><span class="preprocessor">#include <mutex></span></div> -<div class="line"><span class="preprocessor">#include <condition_variable></span></div> -<div class="line"><span class="preprocessor">#include <thread></span></div> -<div class="line"><span class="preprocessor">#include <set></span></div> -<div class="line"><span class="preprocessor">#include <sstream></span></div> -<div class="line"><span class="preprocessor">#include <system_error></span></div> -<div class="line"></div> -<div class="line"><span class="comment">// Linux native IO</span></div> -<div class="line"><span class="preprocessor">#include <assert.h></span></div> -<div class="line"><span class="preprocessor">#include <errno.h></span></div> -<div class="line"><span class="preprocessor">#include <fcntl.h></span></div> -<div class="line"><span class="preprocessor">#include <netdb.h></span></div> -<div class="line"><span class="preprocessor">#include <sys/epoll.h></span></div> -<div class="line"><span class="preprocessor">#include <sys/eventfd.h></span></div> -<div class="line"><span class="preprocessor">#include <unistd.h></span></div> -<div class="line"></div> -<div class="line"><span class="preprocessor">#include "../fake_cpp11.hpp"</span></div> -<div class="line"></div> -<div class="line"><span class="comment">// Private implementation</span></div> -<div class="line"><span class="keyword">namespace </span>{</div> -<div class="line"></div> -<div class="line"></div> -<div class="line"><span class="keyword">using</span> lock_guard = std::lock_guard<std::mutex>;</div> -<div class="line"></div> -<div class="line"><span class="comment">// Get string from errno</span></div> -<div class="line">std::string errno_str(<span class="keyword">const</span> std::string& msg) {</div> -<div class="line"> <span class="keywordflow">return</span> std::system_error(errno, std::system_category(), msg).what();</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><span class="comment">// Throw proton::error(errno_str(msg)) if result < 0</span></div> -<div class="line"><span class="keywordtype">int</span> check(<span class="keywordtype">int</span> result, <span class="keyword">const</span> std::string& msg) {</div> -<div class="line"> <span class="keywordflow">if</span> (result < 0)</div> -<div class="line"> <span class="keywordflow">throw</span> <a name="_a0"></a><a class="code" href="structproton_1_1error.html">proton::error</a>(errno_str(msg));</div> -<div class="line"> <span class="keywordflow">return</span> result;</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><span class="comment">// Wrapper for getaddrinfo() that cleans up in destructor.</span></div> -<div class="line"><span class="keyword">class </span>unique_addrinfo {</div> -<div class="line"> <span class="keyword">public</span>:</div> -<div class="line"> unique_addrinfo(<span class="keyword">const</span> std::string& addr) : addrinfo_(0) {</div> -<div class="line"> <a name="_a1"></a><a class="code" href="classproton_1_1url.html">proton::url</a> u(addr);</div> -<div class="line"> <span class="keywordtype">int</span> result = ::getaddrinfo(char_p(u.<a name="a2"></a><a class="code" href="classproton_1_1url.html#ab85a9642d69a48c9fa5d3a2906d52a9e">host</a>()), char_p(u.<a name="a3"></a><a class="code" href="classproton_1_1url.html#a0e60714e27670ed3bb5ab96715b8b740">port</a>()), 0, &addrinfo_);</div> -<div class="line"> <span class="keywordflow">if</span> (result)</div> -<div class="line"> <span class="keywordflow">throw</span> <a class="code" href="structproton_1_1error.html">proton::error</a>(std::string(<span class="stringliteral">"bad address: "</span>) + gai_strerror(result));</div> -<div class="line"> }</div> -<div class="line"> ~unique_addrinfo() { <span class="keywordflow">if</span> (addrinfo_) ::freeaddrinfo(addrinfo_); }</div> -<div class="line"></div> -<div class="line"> ::addrinfo* operator->()<span class="keyword"> const </span>{ <span class="keywordflow">return</span> addrinfo_; }</div> -<div class="line"></div> -<div class="line"> <span class="keyword">private</span>:</div> -<div class="line"> <span class="keyword">static</span> <span class="keyword">const</span> <span class="keywordtype">char</span>* char_p(<span class="keyword">const</span> std::string& s) { <span class="keywordflow">return</span> s.empty() ? 0 : s.c_str(); }</div> -<div class="line"> ::addrinfo *addrinfo_;</div> -<div class="line">};</div> -<div class="line"></div> -<div class="line"><span class="comment">// File descriptor wrapper that calls ::close in destructor.</span></div> -<div class="line"><span class="keyword">class </span>unique_fd {</div> -<div class="line"> <span class="keyword">public</span>:</div> -<div class="line"> unique_fd(<span class="keywordtype">int</span> fd) : fd_(fd) {}</div> -<div class="line"> ~unique_fd() { <span class="keywordflow">if</span> (fd_ >= 0) ::close(fd_); }</div> -<div class="line"> <span class="keyword">operator</span> int()<span class="keyword"> const </span>{ <span class="keywordflow">return</span> fd_; }</div> -<div class="line"> <span class="keywordtype">int</span> release() { <span class="keywordtype">int</span> ret = fd_; fd_ = -1; <span class="keywordflow">return</span> ret; }</div> -<div class="line"></div> -<div class="line"> <span class="keyword">protected</span>:</div> -<div class="line"> <span class="keywordtype">int</span> fd_;</div> -<div class="line">};</div> -<div class="line"></div> -<div class="line"><span class="keyword">class </span>pollable;</div> -<div class="line"><span class="keyword">class </span>pollable_engine;</div> -<div class="line"><span class="keyword">class </span>pollable_listener;</div> -<div class="line"></div> -<div class="line"><span class="keyword">class </span>epoll_container : <span class="keyword">public</span> <a name="_a4"></a><a class="code" href="classproton_1_1io_1_1container__impl__base.html">proton::io::container_impl_base</a> {</div> -<div class="line"> <span class="keyword">public</span>:</div> -<div class="line"> epoll_container(<span class="keyword">const</span> std::string& <span class="keywordtype">id</span>);</div> -<div class="line"> ~epoll_container();</div> -<div class="line"></div> -<div class="line"> proton::returned<proton::connection> <a name="a5"></a><a class="code" href="classproton_1_1container.html#ae8b4eb94c7de3a287665156a768de8dd">connect</a>(</div> -<div class="line"> <span class="keyword">const</span> std::string& addr, <span class="keyword">const</span> <a name="_a6"></a><a class="code" href="classproton_1_1connection__options.html">proton::connection_options</a>& opts) OVERRIDE;</div> -<div class="line"></div> -<div class="line"> <a name="_a7"></a><a class="code" href="classproton_1_1listener.html">proton::listener</a> <a name="a8"></a><a class="code" href="classproton_1_1container.html#a27d60b6665f37220d9be70f11ab69654">listen</a>(<span class="keyword">const</span> std::string& addr, <a name="_a9"></a><a class="code" href="classproton_1_1listen__handler.html">proton::listen_handler</a>&) OVERRIDE;</div> -<div class="line"></div> -<div class="line"> <span class="keywordtype">void</span> stop_listening(<span class="keyword">const</span> std::string& addr) OVERRIDE;</div> -<div class="line"></div> -<div class="line"> <span class="keywordtype">void</span> <a name="a10"></a><a class="code" href="classproton_1_1container.html#a18954417d3124a8095783ea13dc6d00b">run</a>() OVERRIDE;</div> -<div class="line"> <span class="keywordtype">void</span> <a name="a11"></a><a class="code" href="classproton_1_1container.html#a74bcb386eb5f833bceb6ec86caf1d546">auto_stop</a>(<span class="keywordtype">bool</span>) OVERRIDE;</div> -<div class="line"> <span class="keywordtype">void</span> <a name="a12"></a><a class="code" href="classproton_1_1container.html#a40a47814c7196a4c796015fd5c16b542">stop</a>(<span class="keyword">const</span> <a name="_a13"></a><a class="code" href="classproton_1_1error__condition.html">proton::error_condition</a>& err) OVERRIDE;</div> -<div class="line"></div> -<div class="line"> std::string <a name="a14"></a><a class="code" href="classproton_1_1container.html#a8ebfbb187faf8c69d4283fd836ddf2ea">id</a>() <span class="keyword">const</span> OVERRIDE { <span class="keywordflow">return</span> id_; }</div> -<div class="line"></div> -<div class="line"> <span class="comment">// Functions used internally.</span></div> -<div class="line"> <a name="_a15"></a><a class="code" href="classproton_1_1connection.html">proton::connection</a> add_engine(<a class="code" href="classproton_1_1connection__options.html">proton::connection_options</a> opts, <span class="keywordtype">int</span> fd, <span class="keywordtype">bool</span> server);</div> -<div class="line"> <span class="keywordtype">void</span> erase(pollable*);</div> -<div class="line"></div> -<div class="line"> <span class="comment">// Link names must be unique per container.</span></div> -<div class="line"> <span class="comment">// Generate unique names with a simple atomic counter.</span></div> -<div class="line"> <span class="keyword">class </span>atomic_link_namer : <span class="keyword">public</span> <a name="_a16"></a><a class="code" href="classproton_1_1io_1_1link__namer.html">proton::io::link_namer</a> {</div> -<div class="line"> <span class="keyword">public</span>:</div> -<div class="line"> std::string link_name() {</div> -<div class="line"> std::ostringstream o;</div> -<div class="line"> o << std::hex << ++count_;</div> -<div class="line"> <span class="keywordflow">return</span> o.str();</div> -<div class="line"> }</div> -<div class="line"> <span class="keyword">private</span>:</div> -<div class="line"> std::atomic<uint64_t> count_;</div> -<div class="line"> };</div> -<div class="line"></div> -<div class="line"> atomic_link_namer link_namer;</div> -<div class="line"></div> -<div class="line"> <span class="keyword">private</span>:</div> -<div class="line"> <span class="keyword">template</span> <<span class="keyword">class</span> T> <span class="keywordtype">void</span> store(T& v, <span class="keyword">const</span> T& x)<span class="keyword"> const </span>{ lock_guard g(lock_); v = x; }</div> -<div class="line"></div> -<div class="line"> <span class="keywordtype">void</span> idle_check(<span class="keyword">const</span> lock_guard&);</div> -<div class="line"> <span class="keywordtype">void</span> interrupt();</div> -<div class="line"> <span class="keywordtype">void</span> wait();</div> -<div class="line"></div> -<div class="line"> <span class="keyword">const</span> std::string id_;</div> -<div class="line"> <span class="keyword">const</span> unique_fd epoll_fd_;</div> -<div class="line"> <span class="keyword">const</span> unique_fd interrupt_fd_;</div> -<div class="line"></div> -<div class="line"> <span class="keyword">mutable</span> std::mutex lock_;</div> -<div class="line"></div> -<div class="line"> <a class="code" href="classproton_1_1connection__options.html">proton::connection_options</a> options_;</div> -<div class="line"> std::map<std::string, std::unique_ptr<pollable_listener> > listeners_;</div> -<div class="line"> std::map<pollable*, std::unique_ptr<pollable_engine> > engines_;</div> -<div class="line"></div> -<div class="line"> std::condition_variable stopped_;</div> -<div class="line"> <span class="keywordtype">bool</span> stopping_;</div> -<div class="line"> <a class="code" href="classproton_1_1error__condition.html">proton::error_condition</a> stop_err_;</div> -<div class="line"> std::atomic<size_t> threads_;</div> -<div class="line">};</div> -<div class="line"></div> -<div class="line"><span class="comment">// Base class for pollable file-descriptors. Manages epoll interaction,</span></div> -<div class="line"><span class="comment">// subclasses implement virtual work() to do their serialized work.</span></div> -<div class="line"><span class="keyword">class </span>pollable {</div> -<div class="line"> <span class="keyword">public</span>:</div> -<div class="line"> pollable(<span class="keywordtype">int</span> fd, <span class="keywordtype">int</span> epoll_fd) : fd_(fd), epoll_fd_(epoll_fd), notified_(<span class="keyword">false</span>), working_(<span class="keyword">false</span>)</div> -<div class="line"> {</div> -<div class="line"> <span class="keywordtype">int</span> flags = check(::fcntl(fd, F_GETFL, 0), <span class="stringliteral">"non-blocking"</span>);</div> -<div class="line"> check(::fcntl(fd, F_SETFL, flags | O_NONBLOCK), <span class="stringliteral">"non-blocking"</span>);</div> -<div class="line"> ::epoll_event ev = {};</div> -<div class="line"> ev.data.ptr = <span class="keyword">this</span>;</div> -<div class="line"> ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd_, &ev);</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> <span class="keyword">virtual</span> ~pollable() {</div> -<div class="line"> ::epoll_event ev = {};</div> -<div class="line"> ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd_, &ev); <span class="comment">// Ignore errors.</span></div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> <span class="keywordtype">bool</span> do_work(uint32_t events) {</div> -<div class="line"> {</div> -<div class="line"> lock_guard g(lock_);</div> -<div class="line"> <span class="keywordflow">if</span> (working_)</div> -<div class="line"> <span class="keywordflow">return</span> <span class="keyword">true</span>; <span class="comment">// Another thread is already working.</span></div> -<div class="line"> working_ = <span class="keyword">true</span>;</div> -<div class="line"> notified_ = <span class="keyword">false</span>;</div> -<div class="line"> }</div> -<div class="line"> uint32_t new_events = work(events); <span class="comment">// Serialized, outside the lock.</span></div> -<div class="line"> <span class="keywordflow">if</span> (new_events) {</div> -<div class="line"> lock_guard g(lock_);</div> -<div class="line"> rearm(notified_ ? EPOLLIN|EPOLLOUT : new_events);</div> -<div class="line"> }</div> -<div class="line"> <span class="keywordflow">return</span> new_events;</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> <span class="comment">// Called from any thread to wake up the connection handler.</span></div> -<div class="line"> <span class="keywordtype">void</span> notify() {</div> -<div class="line"> lock_guard g(lock_);</div> -<div class="line"> <span class="keywordflow">if</span> (!notified_) {</div> -<div class="line"> notified_ = <span class="keyword">true</span>;</div> -<div class="line"> <span class="keywordflow">if</span> (!working_) <span class="comment">// No worker thread, rearm now.</span></div> -<div class="line"> rearm(EPOLLIN|EPOLLOUT);</div> -<div class="line"> }</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> <span class="keyword">protected</span>:</div> -<div class="line"></div> -<div class="line"> <span class="comment">// Subclass implements work.</span></div> -<div class="line"> <span class="comment">// Returns epoll events to re-enable or 0 if finished.</span></div> -<div class="line"> <span class="keyword">virtual</span> uint32_t work(uint32_t events) = 0;</div> -<div class="line"></div> -<div class="line"> <span class="keyword">const</span> unique_fd fd_;</div> -<div class="line"> <span class="keyword">const</span> <span class="keywordtype">int</span> epoll_fd_;</div> -<div class="line"></div> -<div class="line"> <span class="keyword">private</span>:</div> -<div class="line"></div> -<div class="line"> <span class="keywordtype">void</span> rearm(uint32_t events) {</div> -<div class="line"> epoll_event ev;</div> -<div class="line"> ev.data.ptr = <span class="keyword">this</span>;</div> -<div class="line"> ev.events = EPOLLONESHOT | events;</div> -<div class="line"> check(::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd_, &ev), <span class="stringliteral">"re-arm epoll"</span>);</div> -<div class="line"> working_ = <span class="keyword">false</span>;</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> std::mutex lock_;</div> -<div class="line"> <span class="keywordtype">bool</span> notified_;</div> -<div class="line"> <span class="keywordtype">bool</span> working_;</div> -<div class="line">};</div> -<div class="line"></div> -<div class="line"><span class="keyword">class </span>epoll_event_loop : <span class="keyword">public</span> <a name="_a17"></a><a class="code" href="classproton_1_1event__loop.html">proton::event_loop</a> {</div> -<div class="line"> <span class="keyword">public</span>:</div> -<div class="line"> <span class="keyword">typedef</span> std::vector<std::function<void()> > jobs;</div> -<div class="line"></div> -<div class="line"> epoll_event_loop(pollable& p) : pollable_(p), closed_(<span class="keyword">false</span>) {}</div> -<div class="line"></div> -<div class="line"> <span class="keywordtype">bool</span> <a name="a18"></a><a class="code" href="classproton_1_1event__loop.html#ad179f7deaa1d11613e23db0e64b4c9b0">inject</a>(std::function<<span class="keywordtype">void</span>()> f) OVERRIDE {</div> -<div class="line"> <span class="comment">// Note this is an unbounded work queue.</span></div> -<div class="line"> <span class="comment">// A resource-safe implementation should be bounded.</span></div> -<div class="line"> lock_guard g(lock_);</div> -<div class="line"> <span class="keywordflow">if</span> (closed_)</div> -<div class="line"> <span class="keywordflow">return</span> <span class="keyword">false</span>;</div> -<div class="line"> jobs_.push_back(f);</div> -<div class="line"> pollable_.notify();</div> -<div class="line"> <span class="keywordflow">return</span> <span class="keyword">true</span>;</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> <span class="keywordtype">bool</span> <a class="code" href="classproton_1_1event__loop.html#ad179f7deaa1d11613e23db0e64b4c9b0">inject</a>(<a name="_a19"></a><a class="code" href="classproton_1_1inject__handler.html">proton::inject_handler</a>& h) OVERRIDE {</div> -<div class="line"> <span class="keywordflow">return</span> <a class="code" href="classproton_1_1event__loop.html#ad179f7deaa1d11613e23db0e64b4c9b0">inject</a>(std::bind(&<a name="a20"></a><a class="code" href="classproton_1_1inject__handler.html#a9edc8a337487cf4ddb601a55f37c324a">proton::inject_handler::on_inject</a>, &h));</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> jobs pop_all() {</div> -<div class="line"> lock_guard g(lock_);</div> -<div class="line"> <span class="keywordflow">return</span> std::move(jobs_);</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> <span class="keywordtype">void</span> close() {</div> -<div class="line"> lock_guard g(lock_);</div> -<div class="line"> closed_ = <span class="keyword">true</span>;</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> <span class="keyword">private</span>:</div> -<div class="line"> std::mutex lock_;</div> -<div class="line"> pollable& pollable_;</div> -<div class="line"> jobs jobs_;</div> -<div class="line"> <span class="keywordtype">bool</span> closed_;</div> -<div class="line">};</div> -<div class="line"></div> -<div class="line"><span class="comment">// Handle epoll wakeups for a connection_engine.</span></div> -<div class="line"><span class="keyword">class </span>pollable_engine : <span class="keyword">public</span> pollable {</div> -<div class="line"> <span class="keyword">public</span>:</div> -<div class="line"> pollable_engine(epoll_container& c, <span class="keywordtype">int</span> fd, <span class="keywordtype">int</span> epoll_fd) :</div> -<div class="line"> pollable(fd, epoll_fd),</div> -<div class="line"> loop_(<span class="keyword">new</span> epoll_event_loop(*<span class="keyword">this</span>)),</div> -<div class="line"> engine_(c, c.link_namer, loop_) {}</div> -<div class="line"></div> -<div class="line"> ~pollable_engine() {</div> -<div class="line"> loop_->close(); <span class="comment">// No calls to notify() after this.</span></div> -<div class="line"> engine_.dispatch(); <span class="comment">// Run any final events.</span></div> -<div class="line"> <span class="keywordflow">try</span> { write(); } <span class="keywordflow">catch</span>(...) {} <span class="comment">// Write connection close if we can.</span></div> -<div class="line"> <span class="keywordflow">for</span> (<span class="keyword">auto</span> f : loop_->pop_all()) {<span class="comment">// Run final queued work for side-effects.</span></div> -<div class="line"> <span class="keywordflow">try</span> { f(); } <span class="keywordflow">catch</span>(...) {}</div> -<div class="line"> }</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> uint32_t work(uint32_t events) {</div> -<div class="line"> <span class="keywordflow">try</span> {</div> -<div class="line"> <span class="keywordtype">bool</span> can_read = events & EPOLLIN, can_write = events && EPOLLOUT;</div> -<div class="line"> <span class="keywordflow">do</span> {</div> -<div class="line"> can_write = can_write && write();</div> -<div class="line"> can_read = can_read && read();</div> -<div class="line"> <span class="keywordflow">for</span> (<span class="keyword">auto</span> f : loop_->pop_all()) <span class="comment">// Run queued work</span></div> -<div class="line"> f();</div> -<div class="line"> engine_.dispatch();</div> -<div class="line"> } <span class="keywordflow">while</span> (can_read || can_write);</div> -<div class="line"> <span class="keywordflow">return</span> (engine_.read_buffer().size ? EPOLLIN:0) |</div> -<div class="line"> (engine_.write_buffer().size ? EPOLLOUT:0);</div> -<div class="line"> } <span class="keywordflow">catch</span> (<span class="keyword">const</span> std::exception& e) {</div> -<div class="line"> engine_.disconnected(<a class="code" href="classproton_1_1error__condition.html">proton::error_condition</a>(<span class="stringliteral">"exception"</span>, e.what()));</div> -<div class="line"> }</div> -<div class="line"> <span class="keywordflow">return</span> 0; <span class="comment">// Ending</span></div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> <a name="_a21"></a><a class="code" href="classproton_1_1io_1_1connection__engine.html">proton::io::connection_engine</a>& engine() { <span class="keywordflow">return</span> engine_; }</div> -<div class="line"></div> -<div class="line"> <span class="keyword">private</span>:</div> -<div class="line"></div> -<div class="line"> <span class="keywordtype">bool</span> write() {</div> -<div class="line"> <span class="keywordflow">if</span> (engine_.write_buffer().size) {</div> -<div class="line"> ssize_t n = ::write(fd_, engine_.write_buffer().data, engine_.write_buffer().size);</div> -<div class="line"> <span class="keywordflow">while</span> (n == EINTR)</div> -<div class="line"> n = ::write(fd_, engine_.write_buffer().data, engine_.write_buffer().size);</div> -<div class="line"> <span class="keywordflow">if</span> (n > 0) {</div> -<div class="line"> engine_.<a name="a22"></a><a class="code" href="classproton_1_1io_1_1connection__engine.html#aedf05de753467bf31b70db61c8effc80">write_done</a>(n);</div> -<div class="line"> <span class="keywordflow">return</span> <span class="keyword">true</span>;</div> -<div class="line"> } <span class="keywordflow">else</span> <span class="keywordflow">if</span> (errno != EAGAIN && errno != EWOULDBLOCK)</div> -<div class="line"> check(n, <span class="stringliteral">"write"</span>);</div> -<div class="line"> }</div> -<div class="line"> <span class="keywordflow">return</span> <span class="keyword">false</span>;</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> <span class="keywordtype">bool</span> read() {</div> -<div class="line"> <span class="keywordflow">if</span> (engine_.read_buffer().size) {</div> -<div class="line"> ssize_t n = ::read(fd_, engine_.read_buffer().data, engine_.read_buffer().size);</div> -<div class="line"> <span class="keywordflow">while</span> (n == EINTR)</div> -<div class="line"> n = ::read(fd_, engine_.read_buffer().data, engine_.read_buffer().size);</div> -<div class="line"> <span class="keywordflow">if</span> (n > 0) {</div> -<div class="line"> engine_.read_done(n);</div> -<div class="line"> <span class="keywordflow">return</span> <span class="keyword">true</span>;</div> -<div class="line"> }</div> -<div class="line"> <span class="keywordflow">else</span> <span class="keywordflow">if</span> (n == 0)</div> -<div class="line"> engine_.read_close();</div> -<div class="line"> <span class="keywordflow">else</span> <span class="keywordflow">if</span> (errno != EAGAIN && errno != EWOULDBLOCK)</div> -<div class="line"> check(n, <span class="stringliteral">"read"</span>);</div> -<div class="line"> }</div> -<div class="line"> <span class="keywordflow">return</span> <span class="keyword">false</span>;</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> <span class="comment">// Lifecycle note: loop_ belongs to the proton::connection, which can live</span></div> -<div class="line"> <span class="comment">// longer than the engine if the application holds a reference to it, we</span></div> -<div class="line"> <span class="comment">// disconnect ourselves with loop_->close() in ~connection_engine()</span></div> -<div class="line"> epoll_event_loop* loop_;</div> -<div class="line"> <a class="code" href="classproton_1_1io_1_1connection__engine.html">proton::io::connection_engine</a> engine_;</div> -<div class="line">};</div> -<div class="line"></div> -<div class="line"><span class="comment">// A pollable listener fd that creates pollable_engine for incoming connections.</span></div> -<div class="line"><span class="keyword">class </span>pollable_listener : <span class="keyword">public</span> pollable {</div> -<div class="line"> <span class="keyword">public</span>:</div> -<div class="line"> pollable_listener(</div> -<div class="line"> <span class="keyword">const</span> std::string& addr,</div> -<div class="line"> <a class="code" href="classproton_1_1listen__handler.html">proton::listen_handler</a>& l,</div> -<div class="line"> <span class="keywordtype">int</span> epoll_fd,</div> -<div class="line"> epoll_container& c</div> -<div class="line"> ) :</div> -<div class="line"> pollable(socket_listen(addr), epoll_fd),</div> -<div class="line"> addr_(addr),</div> -<div class="line"> container_(c),</div> -<div class="line"> listener_(l)</div> -<div class="line"> {}</div> -<div class="line"></div> -<div class="line"> uint32_t work(uint32_t events) {</div> -<div class="line"> <span class="keywordflow">if</span> (events & EPOLLRDHUP) {</div> -<div class="line"> <span class="keywordflow">try</span> { listener_.on_close(); } <span class="keywordflow">catch</span> (...) {}</div> -<div class="line"> <span class="keywordflow">return</span> 0;</div> -<div class="line"> }</div> -<div class="line"> <span class="keywordflow">try</span> {</div> -<div class="line"> <span class="keywordtype">int</span> accepted = check(::accept(fd_, NULL, 0), <span class="stringliteral">"accept"</span>);</div> -<div class="line"> container_.add_engine(listener_.on_accept(), accepted, <span class="keyword">true</span>);</div> -<div class="line"> <span class="keywordflow">return</span> EPOLLIN;</div> -<div class="line"> } <span class="keywordflow">catch</span> (<span class="keyword">const</span> std::exception& e) {</div> -<div class="line"> listener_.on_error(e.what());</div> -<div class="line"> <span class="keywordflow">return</span> 0;</div> -<div class="line"> }</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> std::string addr() { <span class="keywordflow">return</span> addr_; }</div> -<div class="line"></div> -<div class="line"> <span class="keyword">private</span>:</div> -<div class="line"></div> -<div class="line"> <span class="keyword">static</span> <span class="keywordtype">int</span> socket_listen(<span class="keyword">const</span> std::string& addr) {</div> -<div class="line"> std::string msg = <span class="stringliteral">"listen on "</span>+addr;</div> -<div class="line"> unique_addrinfo ainfo(addr);</div> -<div class="line"> unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));</div> -<div class="line"> <span class="keywordtype">int</span> yes = 1;</div> -<div class="line"> check(::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, <span class="keyword">sizeof</span>(yes)), msg);</div> -<div class="line"> check(::bind(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);</div> -<div class="line"> check(::listen(fd, 32), msg);</div> -<div class="line"> <span class="keywordflow">return</span> fd.release();</div> -<div class="line"> }</div> -<div class="line"></div> -<div class="line"> std::string addr_;</div> -<div class="line"> std::function<proton::connection_options(const std::string&)> factory_;</div> -<div class="line"> epoll_container& container_;</div> -<div class="line"> <a class="code" href="classproton_1_1connection__options.html">proton::connection_options</a> opts_;</div> -<div class="line"> <a class="code" href="classproton_1_1listen__handler.html">proton::listen_handler</a>& listener_;</div> -<div class="line">};</div> -<div class="line"></div> -<div class="line"></div> -<div class="line">epoll_container::epoll_container(<span class="keyword">const</span> std::string& <span class="keywordtype">id</span>)</div> -<div class="line"> : id_(<span class="keywordtype">id</span>), epoll_fd_(check(epoll_create(1), <span class="stringliteral">"epoll_create"</span>)),</div> -<div class="line"> interrupt_fd_(check(eventfd(1, 0), <span class="stringliteral">"eventfd"</span>)),</div> -<div class="line"> stopping_(<span class="keyword">false</span>), threads_(0)</div> -<div class="line">{}</div> -<div class="line"></div> -<div class="line">epoll_container::~epoll_container() {</div> -<div class="line"> <span class="keywordflow">try</span> {</div> -<div class="line"> stop(<a class="code" href="classproton_1_1error__condition.html">proton::error_condition</a>(<span class="stringliteral">"exception"</span>, <span class="stringliteral">"container shut-down"</span>));</div> -<div class="line"> wait();</div> -<div class="line"> } <span class="keywordflow">catch</span> (...) {}</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><a class="code" href="classproton_1_1connection.html">proton::connection</a> epoll_container::add_engine(<a class="code" href="classproton_1_1connection__options.html">proton::connection_options</a> opts, <span class="keywordtype">int</span> fd, <span class="keywordtype">bool</span> server)</div> -<div class="line">{</div> -<div class="line"> lock_guard g(lock_);</div> -<div class="line"> <span class="keywordflow">if</span> (stopping_)</div> -<div class="line"> <span class="keywordflow">throw</span> <a class="code" href="structproton_1_1error.html">proton::error</a>(<span class="stringliteral">"container is stopping"</span>);</div> -<div class="line"> std::unique_ptr<pollable_engine> eng(<span class="keyword">new</span> pollable_engine(*<span class="keyword">this</span>, fd, epoll_fd_));</div> -<div class="line"> <span class="keywordflow">if</span> (server)</div> -<div class="line"> eng->engine().accept(opts);</div> -<div class="line"> <span class="keywordflow">else</span></div> -<div class="line"> eng->engine().connect(opts);</div> -<div class="line"> <a class="code" href="classproton_1_1connection.html">proton::connection</a> c = eng->engine().connection();</div> -<div class="line"> eng->notify();</div> -<div class="line"> engines_[eng.get()] = std::move(eng);</div> -<div class="line"> <span class="keywordflow">return</span> c;</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><span class="keywordtype">void</span> epoll_container::erase(pollable* e) {</div> -<div class="line"> lock_guard g(lock_);</div> -<div class="line"> <span class="keywordflow">if</span> (!engines_.erase(e)) {</div> -<div class="line"> pollable_listener* l = <span class="keyword">dynamic_cast<</span>pollable_listener*<span class="keyword">></span>(e);</div> -<div class="line"> <span class="keywordflow">if</span> (l)</div> -<div class="line"> listeners_.erase(l->addr());</div> -<div class="line"> }</div> -<div class="line"> idle_check(g);</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><span class="keywordtype">void</span> epoll_container::idle_check(<span class="keyword">const</span> lock_guard&) {</div> -<div class="line"> <span class="keywordflow">if</span> (stopping_ && engines_.empty() && listeners_.empty())</div> -<div class="line"> interrupt();</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line">proton::returned<proton::connection> epoll_container::connect(</div> -<div class="line"> <span class="keyword">const</span> std::string& addr, <span class="keyword">const</span> <a class="code" href="classproton_1_1connection__options.html">proton::connection_options</a>& opts)</div> -<div class="line">{</div> -<div class="line"> std::string msg = <span class="stringliteral">"connect to "</span>+addr;</div> -<div class="line"> unique_addrinfo ainfo(addr);</div> -<div class="line"> unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));</div> -<div class="line"> check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);</div> -<div class="line"> <span class="keywordflow">return</span> <a class="code" href="namespaceproton.html#a8cdc51abe42497af9f9b2966d1abdfa4">make_thread_safe</a>(add_engine(opts, fd.release(), <span class="keyword">false</span>));</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><a class="code" href="classproton_1_1listener.html">proton::listener</a> epoll_container::listen(<span class="keyword">const</span> std::string& addr, <a class="code" href="classproton_1_1listen__handler.html">proton::listen_handler</a>& lh) {</div> -<div class="line"> lock_guard g(lock_);</div> -<div class="line"> <span class="keywordflow">if</span> (stopping_)</div> -<div class="line"> <span class="keywordflow">throw</span> <a class="code" href="structproton_1_1error.html">proton::error</a>(<span class="stringliteral">"container is stopping"</span>);</div> -<div class="line"> <span class="keyword">auto</span>& l = listeners_[addr];</div> -<div class="line"> <span class="keywordflow">try</span> {</div> -<div class="line"> l.reset(<span class="keyword">new</span> pollable_listener(addr, lh, epoll_fd_, *<span class="keyword">this</span>));</div> -<div class="line"> l->notify();</div> -<div class="line"> <span class="keywordflow">return</span> <a class="code" href="classproton_1_1listener.html">proton::listener</a>(*<span class="keyword">this</span>, addr);</div> -<div class="line"> } <span class="keywordflow">catch</span> (<span class="keyword">const</span> std::exception& e) {</div> -<div class="line"> lh.<a name="a23"></a><a class="code" href="classproton_1_1listen__handler.html#a1bc02e9d18e7d20dde60dee73b3889ac">on_error</a>(e.what());</div> -<div class="line"> lh.<a name="a24"></a><a class="code" href="classproton_1_1listen__handler.html#a9eb8253590ebbf23639571ddc290e64a">on_close</a>();</div> -<div class="line"> <span class="keywordflow">throw</span>;</div> -<div class="line"> }</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><span class="keywordtype">void</span> epoll_container::stop_listening(<span class="keyword">const</span> std::string& addr) {</div> -<div class="line"> lock_guard g(lock_);</div> -<div class="line"> listeners_.erase(addr);</div> -<div class="line"> idle_check(g);</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><span class="keywordtype">void</span> epoll_container::run() {</div> -<div class="line"> ++threads_;</div> -<div class="line"> <span class="keywordflow">try</span> {</div> -<div class="line"> epoll_event e;</div> -<div class="line"> <span class="keywordflow">while</span>(<span class="keyword">true</span>) {</div> -<div class="line"> check(::epoll_wait(epoll_fd_, &e, 1, -1), <span class="stringliteral">"epoll_wait"</span>);</div> -<div class="line"> pollable* p = <span class="keyword">reinterpret_cast<</span>pollable*<span class="keyword">></span>(e.data.ptr);</div> -<div class="line"> <span class="keywordflow">if</span> (!p)</div> -<div class="line"> <span class="keywordflow">break</span>; <span class="comment">// Interrupted</span></div> -<div class="line"> <span class="keywordflow">if</span> (!p->do_work(e.events))</div> -<div class="line"> erase(p);</div> -<div class="line"> }</div> -<div class="line"> } <span class="keywordflow">catch</span> (<span class="keyword">const</span> std::exception& e) {</div> -<div class="line"> stop(<a class="code" href="classproton_1_1error__condition.html">proton::error_condition</a>(<span class="stringliteral">"exception"</span>, e.what()));</div> -<div class="line"> }</div> -<div class="line"> <span class="keywordflow">if</span> (--threads_ == 0)</div> -<div class="line"> stopped_.notify_all();</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><span class="keywordtype">void</span> epoll_container::auto_stop(<span class="keywordtype">bool</span> set) {</div> -<div class="line"> lock_guard g(lock_);</div> -<div class="line"> stopping_ = set;</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><span class="keywordtype">void</span> epoll_container::stop(<span class="keyword">const</span> <a class="code" href="classproton_1_1error__condition.html">proton::error_condition</a>& err) {</div> -<div class="line"> lock_guard g(lock_);</div> -<div class="line"> stop_err_ = err;</div> -<div class="line"> interrupt();</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><span class="keywordtype">void</span> epoll_container::wait() {</div> -<div class="line"> std::unique_lock<std::mutex> l(lock_);</div> -<div class="line"> stopped_.wait(l, [<span class="keyword">this</span>]() { <span class="keywordflow">return</span> this->threads_ == 0; } );</div> -<div class="line"> <span class="keywordflow">for</span> (<span class="keyword">auto</span>& eng : engines_)</div> -<div class="line"> eng.second->engine().disconnected(stop_err_);</div> -<div class="line"> listeners_.clear();</div> -<div class="line"> engines_.clear();</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><span class="keywordtype">void</span> epoll_container::interrupt() {</div> -<div class="line"> <span class="comment">// Add an always-readable fd with 0 data and no ONESHOT to interrupt all threads.</span></div> -<div class="line"> epoll_event ev = {};</div> -<div class="line"> ev.events = EPOLLIN;</div> -<div class="line"> check(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fd_, &ev), <span class="stringliteral">"interrupt"</span>);</div> -<div class="line">}</div> -<div class="line"></div> -<div class="line">}</div> -<div class="line"></div> -<div class="line"><span class="comment">// This is the only public function.</span></div> -<div class="line">std::unique_ptr<proton::container> make_mt_container(<span class="keyword">const</span> std::string& <span class="keywordtype">id</span>) {</div> -<div class="line"> <span class="keywordflow">return</span> std::unique_ptr<proton::container>(<span class="keyword">new</span> epoll_container(<span class="keywordtype">id</span>));</div> -<div class="line">}</div> -</div><!-- fragment --> </div><!-- contents --> +<div class="fragment"><div class="line"><span class="comment">/*</span></div><div class="line"><span class="comment"> * Licensed to the Apache Software Foundation (ASF) under one</span></div><div class="line"><span class="comment"> * or more contributor license agreements. See the NOTICE file</span></div><div class="line"><span class="comment"> * distributed with this work for additional information</span></div><div class="line"><span class="comment"> * regarding copyright ownership. The ASF licenses this file</span></div><div class="line"><span class="comment"> * to you under the Apache License, Version 2.0 (the</span></div><div class="line"><span class="comment"> * "License"); you may not use this file except in compliance</span></div><div class="line"><span class="comment"> * with the License. You may obtain a copy of the License at</span></div><div class="line"><span class="comment"> *</span></div><div class="line"><span class="comment"> * http://www.apache.org/li censes/LICENSE-2.0</span></div><div class="line"><span class="comment"> *</span></div><div class="line"><span class="comment"> * Unless required by applicable law or agreed to in writing,</span></div><div class="line"><span class="comment"> * software distributed under the License is distributed on an</span></div><div class="line"><span class="comment"> * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span></div><div class="line"><span class="comment"> * KIND, either express or implied. See the License for the</span></div><div class="line"><span class="comment"> * specific language governing permissions and limitations</span></div><div class="line"><span class="comment"> * under the License.</span></div><div class="line"><span class="comment"> */</span></div><div class="line"></div><div class="line"><span class="preprocessor">#include "mt_container.hpp"</span></div><div class="line"></div><div class="line"><span class="preprocessor">#include <proton/ default_container.hpp></span></div><div class="line"><span class="preprocessor">#include <proton/event_loop.hpp></span></div><div class="line"><span class="preprocessor">#include <proton/listen_handler.hpp></span></div><div class="line"><span class="preprocessor">#include <proton/url.hpp></span></div><div class="line"></div><div class="line"><span class="preprocessor">#include <proton/io/container_impl_base.hpp></span></div><div class="line"><span class="preprocessor">#include <proton/io/connection_engine.hpp></span></div><div class="line"></div><div class="line"><span class="preprocessor">#include <atomic></span></div><div class="line"><span class="preprocessor">#include <memory></span></div><div class="line"><span class="preprocessor">#include <mutex></span></div><div class="line"><span class="preprocessor">#include <condition_variable></span></div><div class="line"><span class="preprocessor">#include <thread></span></ div><div class="line"><span class="preprocessor">#include <set></span></div><div class="line"><span class="preprocessor">#include <sstream></span></div><div class="line"><span class="preprocessor">#include <system_error></span></div><div class="line"></div><div class="line"><span class="comment">// Linux native IO</span></div><div class="line"><span class="preprocessor">#include <assert.h></span></div><div class="line"><span class="preprocessor">#include <errno.h></span></div><div class="line"><span class="preprocessor">#include <fcntl.h></span></div><div class="line"><span class="preprocessor">#include <netdb.h></span></div><div class="line"><span class="preprocessor">#include <sys/epoll.h></span></div><div class="line"><span class="preprocessor">#include <sys/eventfd.h></span></div><div class="line"><span class="preprocessor">#include <unistd.h></span></div><div class="line"></div><div class="line"><span class="preprocess or">#include "../fake_cpp11.hpp"</span></div><div class="line"></div><div class="line"><span class="comment">// Private implementation</span></div><div class="line"><span class="keyword">namespace </span>{</div><div class="line"></div><div class="line"></div><div class="line"><span class="keyword">using</span> lock_guard = std::lock_guard<std::mutex>;</div><div class="line"></div><div class="line"><span class="comment">// Get string from errno</span></div><div class="line">std::string errno_str(<span class="keyword">const</span> std::string& msg) {</div><div class="line"> <span class="keywordflow">return</span> std::system_error(errno, std::system_category(), msg).what();</div><div class="line">}</div><div class="line"></div><div class="line"><span class="comment">// Throw proton::error(errno_str(msg)) if result < 0</span></div><div class="line"><span class="keywordtype">int</span> check(<span class="keywordtype">int</span> result, <span class="keyword">co nst</span> std::string& msg) {</div><div class="line"> <span class="keywordflow">if</span> (result < 0)</div><div class="line"> <span class="keywordflow">throw</span> <a name="_a0"></a><a class="code" href="structproton_1_1error.html">proton::error</a>(errno_str(msg));</div><div class="line"> <span class="keywordflow">return</span> result;</div><div class="line">}</div><div class="line"></div><div class="line"><span class="comment">// Wrapper for getaddrinfo() that cleans up in destructor.</span></div><div class="line"><span class="keyword">class </span>unique_addrinfo {</div><div class="line"> <span class="keyword">public</span>:</div><div class="line"> unique_addrinfo(<span class="keyword">const</span> std::string& addr) : addrinfo_(0) {</div><div class="line"> <a name="_a1"></a><a class="code" href="classproton_1_1url.html">proton::url</a> u(addr);</div><div class="line"> <span class="keywordtype">int</span> result = ::getaddrinfo(char_p( u.<a name="a2"></a><a class="code" href="classproton_1_1url.html#ab85a9642d69a48c9fa5d3a2906d52a9e">host</a>()), char_p(u.<a name="a3"></a><a class="code" href="classproton_1_1url.html#a0e60714e27670ed3bb5ab96715b8b740">port</a>()), 0, &addrinfo_);</div><div class="line"> <span class="keywordflow">if</span> (result)</div><div class="line"> <span class="keywordflow">throw</span> <a class="code" href="structproton_1_1error.html">proton::error</a>(std::string(<span class="stringliteral">"bad address: "</span>) + gai_strerror(result));</div><div class="line"> }</div><div class="line"> ~unique_addrinfo() { <span class="keywordflow">if</span> (addrinfo_) ::freeaddrinfo(addrinfo_); }</div><div class="line"></div><div class="line"> ::addrinfo* operator->()<span class="keyword"> const </span>{ <span class="keywordflow">return</span> addrinfo_; }</div><div class="line"></div><div class="line"> <span class="keyword">private</span>:</div><div class=" line"> <span class="keyword">static</span> <span class="keyword">const</span> <span class="keywordtype">char</span>* char_p(<span class="keyword">const</span> std::string& s) { <span class="keywordflow">return</span> s.empty() ? 0 : s.c_str(); }</div><div class="line"> ::addrinfo *addrinfo_;</div><div class="line">};</div><div class="line"></div><div class="line"><span class="comment">// File descriptor wrapper that calls ::close in destructor.</span></div><div class="line"><span class="keyword">class </span>unique_fd {</div><div class="line"> <span class="keyword">public</span>:</div><div class="line"> unique_fd(<span class="keywordtype">int</span> fd) : fd_(fd) {}</div><div class="line"> ~unique_fd() { <span class="keywordflow">if</span> (fd_ >= 0) ::close(fd_); }</div><div class="line"> <span class="keyword">operator</span> int()<span class="keyword"> const </span>{ <span class="keywordflow">return</span> fd_; }</div><div class="line"> <span class="keywo rdtype">int</span> release() { <span class="keywordtype">int</span> ret = fd_; fd_ = -1; <span class="keywordflow">return</span> ret; }</div><div class="line"></div><div class="line"> <span class="keyword">protected</span>:</div><div class="line"> <span class="keywordtype">int</span> fd_;</div><div class="line">};</div><div class="line"></div><div class="line"><span class="keyword">class </span>pollable;</div><div class="line"><span class="keyword">class </span>pollable_engine;</div><div class="line"><span class="keyword">class </span>pollable_listener;</div><div class="line"></div><div class="line"><span class="keyword">class </span>epoll_container : <span class="keyword">public</span> <a name="_a4"></a><a class="code" href="classproton_1_1io_1_1container__impl__base.html">proton::io::container_impl_base</a> {</div><div class="line"> <span class="keyword">public</span>:</div><div class="line"> epoll_container(<span class="keyword">const</span> std::string& <span class=" keywordtype">id</span>);</div><div class="line"> ~epoll_container();</div><div class="line"></div><div class="line"> proton::returned<proton::connection> connect(</div><div class="line"> <span class="keyword">const</span> std::string& addr, <span class="keyword">const</span> <a name="_a5"></a><a class="code" href="classproton_1_1connection__options.html">proton::connection_options</a>& opts) OVERRIDE;</div><div class="line"></div><div class="line"> <a name="_a6"></a><a class="code" href="classproton_1_1listener.html">proton::listener</a> listen(<span class="keyword">const</span> std::string& addr, <a name="_a7"></a><a class="code" href="classproton_1_1listen__handler.html">proton::listen_handler</a>&) OVERRIDE;</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> stop_listening(<span class="keyword">const</span> std::string& addr) OVERRIDE;</div><div class="line"></div><div class="line"> <span class="ke ywordtype">void</span> run() OVERRIDE;</div><div class="line"> <span class="keywordtype">void</span> auto_stop(<span class="keywordtype">bool</span>) OVERRIDE;</div><div class="line"> <span class="keywordtype">void</span> stop(<span class="keyword">const</span> <a name="_a8"></a><a class="code" href="classproton_1_1error__condition.html">proton::error_condition</a>& err) OVERRIDE;</div><div class="line"></div><div class="line"> std::string id() <span class="keyword">const</span> OVERRIDE { <span class="keywordflow">return</span> id_; }</div><div class="line"></div><div class="line"> <span class="comment">// Functions used internally.</span></div><div class="line"> <a name="_a9"></a><a class="code" href="classproton_1_1connection.html">proton::connection</a> add_engine(<a class="code" href="classproton_1_1connection__options.html">proton::connection_options</a> opts, <span class="keywordtype">int</span> fd, <span class="keywordtype">bool</span> server);</div><div c lass="line"> <span class="keywordtype">void</span> erase(pollable*);</div><div class="line"></div><div class="line"> <span class="comment">// Link names must be unique per container.</span></div><div class="line"> <span class="comment">// Generate unique names with a simple atomic counter.</span></div><div class="line"> <span class="keyword">class </span>atomic_link_namer : <span class="keyword">public</span> <a name="_a10"></a><a class="code" href="classproton_1_1io_1_1link__namer.html">proton::io::link_namer</a> {</div><div class="line"> <span class="keyword">public</span>:</div><div class="line"> std::string link_name() {</div><div class="line"> std::ostringstream o;</div><div class="line"> o << std::hex << ++count_;</div><div class="line"> <span class="keywordflow">return</span> o.str();</div><div class="line"> }</div><div class="line"> <span class="keyword">private</span>:</div><div class="line"> std::atomic<uint64_t> count_;</div><div class="line"> };</div><div class="line"></div><div class="line"> atomic_link_namer link_namer;</div><div class="line"></div><div class="line"> <span class="keyword">private</span>:</div><div class="line"> <span class="keyword">template</span> <<span class="keyword">class</span> T> <span class="keywordtype">void</span> store(T& v, <span class="keyword">const</span> T& x)<span class="keyword"> const </span>{ lock_guard g(lock_); v = x; }</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> idle_check(<span class="keyword">const</span> lock_guard&);</div><div class="line"> <span class="keywordtype">void</span> interrupt();</div><div class="line"> <span class="keywordtype">void</span> wait();</div><div class="line"></div><div class="line"> <span class="keyword">const</span> std::string id_;</div><div class="line"> <span class="keyword">const</span> unique_fd ep oll_fd_;</div><div class="line"> <span class="keyword">const</span> unique_fd interrupt_fd_;</div><div class="line"></div><div class="line"> <span class="keyword">mutable</span> std::mutex lock_;</div><div class="line"></div><div class="line"> <a class="code" href="classproton_1_1connection__options.html">proton::connection_options</a> options_;</div><div class="line"> std::map<std::string, std::unique_ptr<pollable_listener> > listeners_;</div><div class="line"> std::map<pollable*, std::unique_ptr<pollable_engine> > engines_;</div><div class="line"></div><div class="line"> std::condition_variable stopped_;</div><div class="line"> <span class="keywordtype">bool</span> stopping_;</div><div class="line"> <a class="code" href="classproton_1_1error__condition.html">proton::error_condition</a> stop_err_;</div><div class="line"> std::atomic<size_t> threads_;</div><div class="line">};</div><div class="line"></div><div class="line"><sp an class="comment">// Base class for pollable file-descriptors. Manages epoll interaction,</span></div><div class="line"><span class="comment">// subclasses implement virtual work() to do their serialized work.</span></div><div class="line"><span class="keyword">class </span>pollable {</div><div class="line"> <span class="keyword">public</span>:</div><div class="line"> pollable(<span class="keywordtype">int</span> fd, <span class="keywordtype">int</span> epoll_fd) : fd_(fd), epoll_fd_(epoll_fd), notified_(<span class="keyword">false</span>), working_(<span class="keyword">false</span>)</div><div class="line"> {</div><div class="line"> <span class="keywordtype">int</span> flags = check(::fcntl(fd, F_GETFL, 0), <span class="stringliteral">"non-blocking"</span>);</div><div class="line"> check(::fcntl(fd, F_SETFL, flags | O_NONBLOCK), <span class="stringliteral">"non-blocking"</span>);</div><div class="line"> ::epoll_event ev = {};</div>< div class="line"> ev.data.ptr = <span class="keyword">this</span>;</div><div class="line"> ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd_, &ev);</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keyword">virtual</span> ~pollable() {</div><div class="line"> ::epoll_event ev = {};</div><div class="line"> ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd_, &ev); <span class="comment">// Ignore errors.</span></div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">bool</span> do_work(uint32_t events) {</div><div class="line"> {</div><div class="line"> lock_guard g(lock_);</div><div class="line"> <span class="keywordflow">if</span> (working_)</div><div class="line"> <span class="keywordflow">return</span> <span class="keyword">true</span>; <span class="comment">// Another thread is already working.</span></div><div class="l ine"> working_ = <span class="keyword">true</span>;</div><div class="line"> notified_ = <span class="keyword">false</span>;</div><div class="line"> }</div><div class="line"> uint32_t new_events = work(events); <span class="comment">// Serialized, outside the lock.</span></div><div class="line"> <span class="keywordflow">if</span> (new_events) {</div><div class="line"> lock_guard g(lock_);</div><div class="line"> rearm(notified_ ? EPOLLIN|EPOLLOUT : new_events);</div><div class="line"> }</div><div class="line"> <span class="keywordflow">return</span> new_events;</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="comment">// Called from any thread to wake up the connection handler.</span></div><div class="line"> <span class="keywordtype">void</span> notify() {</div><div class="line"> lock_guard g(lock_);</div><div class="line"> <span class="keywor dflow">if</span> (!notified_) {</div><div class="line"> notified_ = <span class="keyword">true</span>;</div><div class="line"> <span class="keywordflow">if</span> (!working_) <span class="comment">// No worker thread, rearm now.</span></div><div class="line"> rearm(EPOLLIN|EPOLLOUT);</div><div class="line"> }</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keyword">protected</span>:</div><div class="line"></div><div class="line"> <span class="comment">// Subclass implements work.</span></div><div class="line"> <span class="comment">// Returns epoll events to re-enable or 0 if finished.</span></div><div class="line"> <span class="keyword">virtual</span> uint32_t work(uint32_t events) = 0;</div><div class="line"></div><div class="line"> <span class="keyword">const</span> unique_fd fd_;</div><div class="line"> <span class="keyword">const</span> <span class="keywordtype">int</span> epo ll_fd_;</div><div class="line"></div><div class="line"> <span class="keyword">private</span>:</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> rearm(uint32_t events) {</div><div class="line"> epoll_event ev;</div><div class="line"> ev.data.ptr = <span class="keyword">this</span>;</div><div class="line"> ev.events = EPOLLONESHOT | events;</div><div class="line"> check(::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd_, &ev), <span class="stringliteral">"re-arm epoll"</span>);</div><div class="line"> working_ = <span class="keyword">false</span>;</div><div class="line"> }</div><div class="line"></div><div class="line"> std::mutex lock_;</div><div class="line"> <span class="keywordtype">bool</span> notified_;</div><div class="line"> <span class="keywordtype">bool</span> working_;</div><div class="line">};</div><div class="line"></div><div class="line"><span class="keyword">class </span>epoll_ev ent_loop : <span class="keyword">public</span> <a name="_a11"></a><a class="code" href="classproton_1_1event__loop.html">proton::event_loop</a> {</div><div class="line"> <span class="keyword">public</span>:</div><div class="line"> <span class="keyword">typedef</span> std::vector<std::function<void()> > jobs;</div><div class="line"></div><div class="line"> epoll_event_loop(pollable& p) : pollable_(p), closed_(<span class="keyword">false</span>) {}</div><div class="line"></div><div class="line"> <span class="keywordtype">bool</span> inject(std::function<<span class="keywordtype">void</span>()> f) OVERRIDE {</div><div class="line"> <span class="comment">// Note this is an unbounded work queue.</span></div><div class="line"> <span class="comment">// A resource-safe implementation should be bounded.</span></div><div class="line"> lock_guard g(lock_);</div><div class="line"> <span class="keywordflow">if</span> (closed_)</div><div class="line"> <span class="keywordflow">return</span> <span class="keyword">false</span>;</div><div class="line"> jobs_.push_back(f);</div><div class="line"> pollable_.notify();</div><div class="line"> <span class="keywordflow">return</span> <span class="keyword">true</span>;</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">bool</span> inject(<a name="_a12"></a><a class="code" href="classproton_1_1inject__handler.html">proton::inject_handler</a>& h) OVERRIDE {</div><div class="line"> <span class="keywordflow">return</span> inject(std::bind(&<a name="a13"></a><a class="code" href="classproton_1_1inject__handler.html#a9edc8a337487cf4ddb601a55f37c324a">proton::inject_handler::on_inject</a>, &h));</div><div class="line"> }</div><div class="line"></div><div class="line"> jobs pop_all() {</div><div class="line"> lock_guard g(lock_);</div><div class="line"> <span class="keywordflow">return</span> std::move(jobs_);</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> close() {</div><div class="line"> lock_guard g(lock_);</div><div class="line"> closed_ = <span class="keyword">true</span>;</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keyword">private</span>:</div><div class="line"> std::mutex lock_;</div><div class="line"> pollable& pollable_;</div><div class="line"> jobs jobs_;</div><div class="line"> <span class="keywordtype">bool</span> closed_;</div><div class="line">};</div><div class="line"></div><div class="line"><span class="comment">// Handle epoll wakeups for a connection_engine.</span></div><div class="line"><span class="keyword">class </span>pollable_engine : <span class="keyword">public</span> pollable {</div><div class="line"> <span class="keyword">public</span>:</div><div class="line"> pollable_engine(epoll_container& c, <span class="keywordtype">int</span> fd, <span class="keywordtype">int</span> epoll_fd) :</div><div class="line"> pollable(fd, epoll_fd),</div><div class="line"> loop_(<span class="keyword">new</span> epoll_event_loop(*<span class="keyword">this</span>)),</div><div class="line"> engine_(c, c.link_namer, loop_) {}</div><div class="line"></div><div class="line"> ~pollable_engine() {</div><div class="line"> loop_->close(); <span class="comment">// No calls to notify() after this.</span></div><div class="line"> engine_.dispatch(); <span class="comment">// Run any final events.</span></div><div class="line"> <span class="keywordflow">try</span> { write(); } <span class="keywordflow">catch</span>(...) {} <span class="comment">// Write connection close if we can.</span></div><div class="line"> <span class="keywordflow">for</span> (<span class="keyword">auto</span> f : l oop_->pop_all()) {<span class="comment">// Run final queued work for side-effects.</span></div><div class="line"> <span class="keywordflow">try</span> { f(); } <span class="keywordflow">catch</span>(...) {}</div><div class="line"> }</div><div class="line"> }</div><div class="line"></div><div class="line"> uint32_t work(uint32_t events) {</div><div class="line"> <span class="keywordflow">try</span> {</div><div class="line"> <span class="keywordtype">bool</span> can_read = events & EPOLLIN, can_write = events && EPOLLOUT;</div><div class="line"> <span class="keywordflow">do</span> {</div><div class="line"> can_write = can_write && write();</div><div class="line"> can_read = can_read && read();</div><div class="line"> <span class="keywordflow">for</span> (<span class="keyword">auto</span> f : loop_->pop_all()) <span class="comment">// Run queued work</span ></div><div class="line"> f();</div><div class="line"> > engine_.dispatch();</div><div class="line"> } <span >class="keywordflow">while</span> (can_read || can_write);</div><div >class="line"> <span class="keywordflow">return</span> >(engine_.read_buffer().size ? EPOLLIN:0) |</div><div class="line"> > (engine_.write_buffer().size ? EPOLLOUT:0);</div><div class="line"> > } <span class="keywordflow">catch</span> (<span class="keyword">const</span> >std::exception& e) {</div><div class="line"> >engine_.disconnected(<a class="code" >href="classproton_1_1error__condition.html">proton::error_condition</a>(<span >class="stringliteral">"exception"</span>, e.what()));</div><div >class="line"> }</div><div class="line"> <span >class="keywordflow">return</span> 0; <span class="comment">// >Ending</span></div><div class="line"> }</div><div class="line"></div><div >class="line"> <a name="_a14"></a><a class="code" href="classproton_1_1io_1_1connection__engine.html">proton::io::connection_engine</a>& engine() { <span class="keywordflow">return</span> engine_; }</div><div class="line"></div><div class="line"> <span class="keyword">private</span>:</div><div class="line"></div><div class="line"> <span class="keywordtype">bool</span> write() {</div><div class="line"> <span class="keywordflow">if</span> (engine_.write_buffer().size) {</div><div class="line"> ssize_t n = ::write(fd_, engine_.write_buffer().data, engine_.write_buffer().size);</div><div class="line"> <span class="keywordflow">while</span> (n == EINTR)</div><div class="line"> n = ::write(fd_, engine_.write_buffer().data, engine_.write_buffer().size);</div><div class="line"> <span class="keywordflow">if</span> (n > 0) {</div><div class="line"> engine_.<a name="a15"></a><a class="code" href="classproton_1_1io_1_1connection __engine.html#aedf05de753467bf31b70db61c8effc80">write_done</a>(n);</div><div class="line"> <span class="keywordflow">return</span> <span class="keyword">true</span>;</div><div class="line"> } <span class="keywordflow">else</span> <span class="keywordflow">if</span> (errno != EAGAIN && errno != EWOULDBLOCK)</div><div class="line"> check(n, <span class="stringliteral">"write"</span>);</div><div class="line"> }</div><div class="line"> <span class="keywordflow">return</span> <span class="keyword">false</span>;</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">bool</span> read() {</div><div class="line"> <span class="keywordflow">if</span> (engine_.read_buffer().size) {</div><div class="line"> ssize_t n = ::read(fd_, engine_.read_buffer().data, engine_.read_buffer().size);</div><div class="line"> <span class="keywordflow">while</sp an> (n == EINTR)</div><div class="line"> n = ::read(fd_, engine_.read_buffer().data, engine_.read_buffer().size);</div><div class="line"> <span class="keywordflow">if</span> (n > 0) {</div><div class="line"> engine_.read_done(n);</div><div class="line"> <span class="keywordflow">return</span> <span class="keyword">true</span>;</div><div class="line"> }</div><div class="line"> <span class="keywordflow">else</span> <span class="keywordflow">if</span> (n == 0)</div><div class="line"> engine_.read_close();</div><div class="line"> <span class="keywordflow">else</span> <span class="keywordflow">if</span> (errno != EAGAIN && errno != EWOULDBLOCK)</div><div class="line"> check(n, <span class="stringliteral">"read"</span>);</div><div class="line"> }</div><div class="line"> <span class="keywordflow">return</span> <span class="keyword">false< /span>;</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="comment">// Lifecycle note: loop_ belongs to the proton::connection, which can live</span></div><div class="line"> <span class="comment">// longer than the engine if the application holds a reference to it, we</span></div><div class="line"> <span class="comment">// disconnect ourselves with loop_->close() in ~connection_engine()</span></div><div class="line"> epoll_event_loop* loop_;</div><div class="line"> <a class="code" href="classproton_1_1io_1_1connection__engine.html">proton::io::connection_engine</a> engine_;</div><div class="line">};</div><div class="line"></div><div class="line"><span class="comment">// A pollable listener fd that creates pollable_engine for incoming connections.</span></div><div class="line"><span class="keyword">class </span>pollable_listener : <span class="keyword">public</span> pollable {</div><div class="line"> <span class="keyword">publ ic</span>:</div><div class="line"> pollable_listener(</div><div class="line"> <span class="keyword">const</span> std::string& addr,</div><div class="line"> <a class="code" href="classproton_1_1listen__handler.html">proton::listen_handler</a>& l,</div><div class="line"> <span class="keywordtype">int</span> epoll_fd,</div><div class="line"> epoll_container& c</div><div class="line"> ) :</div><div class="line"> pollable(socket_listen(addr), epoll_fd),</div><div class="line"> addr_(addr),</div><div class="line"> container_(c),</div><div class="line"> listener_(l)</div><div class="line"> {}</div><div class="line"></div><div class="line"> uint32_t work(uint32_t events) {</div><div class="line"> <span class="keywordflow">if</span> (events & EPOLLRDHUP) {</div><div class="line"> <span class="keywordflow">try</span> { listener_.on_close(); } <span class="keywordflow">catch</span> (...) {}</d iv><div class="line"> <span class="keywordflow">return</span> 0;</div><div class="line"> }</div><div class="line"> <span class="keywordflow">try</span> {</div><div class="line"> <span class="keywordtype">int</span> accepted = check(::accept(fd_, NULL, 0), <span class="stringliteral">"accept"</span>);</div><div class="line"> container_.add_engine(listener_.on_accept(), accepted, <span class="keyword">true</span>);</div><div class="line"> <span class="keywordflow">return</span> EPOLLIN;</div><div class="line"> } <span class="keywordflow">catch</span> (<span class="keyword">const</span> std::exception& e) {</div><div class="line"> listener_.on_error(e.what());</div><div class="line"> <span class="keywordflow">return</span> 0;</div><div class="line"> }</div><div class="line"> }</div><div class="line"></div><div class="line"> std::string addr() { <span class="keywordflow">ret urn</span> addr_; }</div><div class="line"></div><div class="line"> <span class="keyword">private</span>:</div><div class="line"></div><div class="line"> <span class="keyword">static</span> <span class="keywordtype">int</span> socket_listen(<span class="keyword">const</span> std::string& addr) {</div><div class="line"> std::string msg = <span class="stringliteral">"listen on "</span>+addr;</div><div class="line"> unique_addrinfo ainfo(addr);</div><div class="line"> unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));</div><div class="line"> <span class="keywordtype">int</span> yes = 1;</div><div class="line"> check(::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, <span class="keyword">sizeof</span>(yes)), msg);</div><div class="line"> check(::bind(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);</div><div class="line"> check(::listen(fd, 32), msg);</div><div class="line"> <span class ="keywordflow">return</span> fd.release();</div><div class="line"> }</div><div class="line"></div><div class="line"> std::string addr_;</div><div class="line"> std::function<proton::connection_options(const std::string&)> factory_;</div><div class="line"> epoll_container& container_;</div><div class="line"> <a class="code" href="classproton_1_1connection__options.html">proton::connection_options</a> opts_;</div><div class="line"> <a class="code" href="classproton_1_1listen__handler.html">proton::listen_handler</a>& listener_;</div><div class="line">};</div><div class="line"></div><div class="line"></div><div class="line">epoll_container::epoll_container(<span class="keyword">const</span> std::string& <span class="keywordtype">id</span>)</div><div class="line"> : id_(<span class="keywordtype">id</span>), epoll_fd_(check(epoll_create(1), <span class="stringliteral">"epoll_create"</span>)),</div><div class="line "> interrupt_fd_(check(eventfd(1, 0), <span class="stringliteral">"eventfd"</span>)),</div><div class="line"> stopping_(<span class="keyword">false</span>), threads_(0)</div><div class="line">{}</div><div class="line"></div><div class="line">epoll_container::~epoll_container() {</div><div class="line"> <span class="keywordflow">try</span> {</div><div class="line"> stop(<a class="code" href="classproton_1_1error__condition.html">proton::error_condition</a>(<span class="stringliteral">"exception"</span>, <span class="stringliteral">"container shut-down"</span>));</div><div class="line"> wait();</div><div class="line"> } <span class="keywordflow">catch</span> (...) {}</div><div class="line">}</div><div class="line"></div><div class="line"><a class="code" href="classproton_1_1connection.html">proton::connection</a> epoll_container::add_engine(<a class="code" href="classproton_1_1connection__options.html">proton::connection_opti ons</a> opts, <span class="keywordtype">int</span> fd, <span class="keywordtype">bool</span> server)</div><div class="line">{</div><div class="line"> lock_guard g(lock_);</div><div class="line"> <span class="keywordflow">if</span> (stopping_)</div><div class="line"> <span class="keywordflow">throw</span> <a class="code" href="structproton_1_1error.html">proton::error</a>(<span class="stringliteral">"container is stopping"</span>);</div><div class="line"> std::unique_ptr<pollable_engine> eng(<span class="keyword">new</span> pollable_engine(*<span class="keyword">this</span>, fd, epoll_fd_));</div><div class="line"> <span class="keywordflow">if</span> (server)</div><div class="line"> eng->engine().accept(opts);</div><div class="line"> <span class="keywordflow">else</span></div><div class="line"> eng->engine().connect(opts);</div><div class="line"> <a class="code" href="classproton_1_1connection.html">proton::connection</a> c = eng->engine().connection();</div><div class="line"> eng->notify();</div><div class="line"> engines_[eng.get()] = std::move(eng);</div><div class="line"> <span class="keywordflow">return</span> c;</div><div class="line">}</div><div class="line"></div><div class="line"><span class="keywordtype">void</span> epoll_container::erase(pollable* e) {</div><div class="line"> lock_guard g(lock_);</div><div class="line"> <span class="keywordflow">if</span> (!engines_.erase(e)) {</div><div class="line"> pollable_listener* l = <span class="keyword">dynamic_cast<</span>pollable_listener*<span class="keyword">></span>(e);</div><div class="line"> <span class="keywordflow">if</span> (l)</div><div class="line"> listeners_.erase(l->addr());</div><div class="line"> }</div><div class="line"> idle_check(g);</div><div class="line">}</div><div class="line"></div><div class="line"><span class="keywordtype">void</span> epoll_container::idle_check (<span class="keyword">const</span> lock_guard&) {</div><div class="line"> <span class="keywordflow">if</span> (stopping_ && engines_.empty() && listeners_.empty())</div><div class="line"> interrupt();</div><div class="line">}</div><div class="line"></div><div class="line">proton::returned<proton::connection> epoll_container::connect(</div><div class="line"> <span class="keyword">const</span> std::string& addr, <span class="keyword">const</span> <a class="code" href="classproton_1_1connection__options.html">proton::connection_options</a>& opts)</div><div class="line">{</div><div class="line"> std::string msg = <span class="stringliteral">"connect to "</span>+addr;</div><div class="line"> unique_addrinfo ainfo(addr);</div><div class="line"> unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));</div><div class="line"> check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);</div><div cl ass="line"> <span class="keywordflow">return</span> <a class="code" href="namespaceproton.html#a8cdc51abe42497af9f9b2966d1abdfa4">make_thread_safe</a>(add_engine(opts, fd.release(), <span class="keyword">false</span>));</div><div class="line">}</div><div class="line"></div><div class="line"><a class="code" href="classproton_1_1listener.html">proton::listener</a> epoll_container::listen(<span class="keyword">const</span> std::string& addr, <a class="code" href="classproton_1_1listen__handler.html">proton::listen_handler</a>& lh) {</div><div class="line"> lock_guard g(lock_);</div><div class="line"> <span class="keywordflow">if</span> (stopping_)</div><div class="line"> <span class="keywordflow">throw</span> <a class="code" href="structproton_1_1error.html">proton::error</a>(<span class="stringliteral">"container is stopping"</span>);</div><div class="line"> <span class="keyword">auto</span>& l = listeners_[addr];</div><div class="line"> <sp an class="keywordflow">try</span> {</div><div class="line"> l.reset(<span class="keyword">new</span> pollable_listener(addr, lh, epoll_fd_, *<span class="keyword">this</span>));</div><div class="line"> l->notify();</div><div class="line"> <span class="keywordflow">return</span> <a class="code" href="classproton_1_1listener.html">proton::listener</a>(*<span class="keyword">this</span>, addr);</div><div class="line"> } <span class="keywordflow">catch</span> (<span class="keyword">const</span> std::exception& e) {</div><div class="line"> lh.<a name="a16"></a><a class="code" href="classproton_1_1listen__handler.html#a1bc02e9d18e7d20dde60dee73b3889ac">on_error</a>(e.what());</div><div class="line"> lh.<a name="a17"></a><a class="code" href="classproton_1_1listen__handler.html#a9eb8253590ebbf23639571ddc290e64a">on_close</a>();</div><div class="line"> <span class="keywordflow">throw</span>;</div><div class="line"> }</div><div class="li ne">}</div><div class="line"></div><div class="line"><span class="keywordtype">void</span> epoll_container::stop_listening(<span class="keyword">const</span> std::string& addr) {</div><div class="line"> lock_guard g(lock_);</div><div class="line"> listeners_.erase(addr);</div><div class="line"> idle_check(g);</div><div class="line">}</div><div class="line"></div><div class="line"><span class="keywordtype">void</span> epoll_container::run() {</div><div class="line"> ++threads_;</div><div class="line"> <span class="keywordflow">try</span> {</div><div class="line"> epoll_event e;</div><div class="line"> <span class="keywordflow">while</span>(<span class="keyword">true</span>) {</div><div class="line"> check(::epoll_wait(epoll_fd_, &e, 1, -1), <span class="stringliteral">"epoll_wait"</span>);</div><div class="line"> pollable* p = <span class="keyword">reinterpret_cast<</span>pollable*<span class="keyword">></span >(e.data.ptr);</div><div class="line"> <span >class="keywordflow">if</span> (!p)</div><div class="line"> ><span class="keywordflow">break</span>; <span class="comment">// >Interrupted</span></div><div class="line"> <span >class="keywordflow">if</span> (!p->do_work(e.events))</div><div >class="line"> erase(p);</div><div class="line"> >}</div><div class="line"> } <span class="keywordflow">catch</span> (<span >class="keyword">const</span> std::exception& e) {</div><div class="line"> > stop(<a class="code" >href="classproton_1_1error__condition.html">proton::error_condition</a>(<span >class="stringliteral">"exception"</span>, e.what()));</div><div >class="line"> }</div><div class="line"> <span >class="keywordflow">if</span> (--threads_ == 0)</div><div class="line"> > stopped_.notify_all();</div><div class="line">}</div><div >class="line"></div><div class="line"><span class="keywordtype">void</spa n> epoll_container::auto_stop(<span class="keywordtype">bool</span> <span class="keyword">set</span>) {</div><div class="line"> lock_guard g(lock_);</div><div class="line"> stopping_ = <span class="keyword">set</span>;</div><div class="line">}</div><div class="line"></div><div class="line"><span class="keywordtype">void</span> epoll_container::stop(<span class="keyword">const</span> <a class="code" href="classproton_1_1error__condition.html">proton::error_condition</a>& err) {</div><div class="line"> lock_guard g(lock_);</div><div class="line"> stop_err_ = err;</div><div class="line"> interrupt();</div><div class="line">}</div><div class="line"></div><div class="line"><span class="keywordtype">void</span> epoll_container::wait() {</div><div class="line"> std::unique_lock<std::mutex> l(lock_);</div><div class="line"> stopped_.wait(l, [<span class="keyword">this</span>]() { <span class="keywordflow">return</span> this->threads_ == 0; } );</div><div cl ass="line"> <span class="keywordflow">for</span> (<span class="keyword">auto</span>& eng : engines_)</div><div class="line"> eng.second->engine().disconnected(stop_err_);</div><div class="line"> listeners_.clear();</div><div class="line"> engines_.clear();</div><div class="line">}</div><div class="line"></div><div class="line"><sp
<TRUNCATED> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org