On Mon, Jul 6, 2020 at 7:41 AM Vitali Lovich <[email protected]> wrote:
> > On Mon, Jul 6, 2020 at 7:26 AM Kenton Varda <[email protected]> wrote: > >> Almost. Two issues I can think of: >> >> 1. After each pump completes, you probably want to call shutdownWrite() >> to propagate the EOF. >> 2. `joinPromises()` always waits for all promises to resolve. I've often >> found that this turns out to be the wrong behavior when one of the joined >> promises throws an exception. Usually you want the other tasks canceled in >> that case. I think that might be the case here -- if you get an I/O error >> in one direction, you probably want to kill the whole stream. Then again, >> probably that'll happen anyway in most cases. (Whereas, EOF is not an >> error, so you do want to wait for the other promise in that case.) >> > > So more like this? > > return stream1.pumpTo(stream2).ignoreResult().then( > [&] {stream2.shutdownWrite()}, > [&](kj::Exception&& e){ > stream1.shutdownWrite(); > stream1.abortRead(); > stream2.shutdownWrite(); > stream2.abortRead(); > })).exclusiveJoin( > stream2.pumpTo(stream1).ignoreResult().then( > [&] {stream1.shutdownWrite()}, > [&](kj::Exception&& e){ > stream1.shutdownWrite(); > stream1.abortRead(); > stream2.shutdownWrite(); > stream2.abortRead(); > })); > Actually, I think this is two hand-wavy. Also I think the original inclusive join is actually correct because I want to ensure that both sides finish any I/O that may be in flight. Otherwise I may end the stream prematurely just because 1 end finished (e.g. 1 end sends some data & then closes because its done - the peer won't receive all the data). My current code looks something like: void completelyClose(kj::AsyncIoStream& stream) { stream.shutdownWrite(); stream.abortRead(); }; kj::Canceler stillRunning; auto stream1 = ioContext.lowLevelProvider->wrapSocketFd( rawSocket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); stillRunning.wrap(listener->accept().then([&](kj::Own<kj::AsyncIoStream>&& stream2) mutable { auto paf = kj::newPromiseAndFulfiller<void>(); auto unsafeStream2 = stream2.get(); kj::Vector<kj::Promise<void>> pumped; pumped.add(stream1->pumpTo(stream2) .ignoreResult() .then( [stream2 = stream2.get()] { stream2->shutdownWrite(); }, [&stream1, stream2 = stream2.get()] (kj::Exception&& e) { completelyClose(*stream1); completelyClose(*stream2); }); pumped.add(unsafeStream2->pumpTo(stream1) .ignoreResult() .then( [&stream1] { stream1->shutdownWrite(); }, [&stream1, stream2 = unsafeStream2] (kj::Exception&& e) { completelyClose(*stream1); completelyClose(*stream2); })); return kj::joinPromises(pumped.releaseAsArray()) .attach( paf.promise.fork(), // AcceptedConnection simply fulfills on destruction. kj::heap<AcceptedConnection>(kj::mv(stream2), kj::mv(paf.fulfiller)), ); })).wait(waitScope); The fulfiller stuff is another place I'm pretty sure I haven't done right. I was just going off of what's happening under the hood when you wait on the promise that TwoPartyServer returns when it listens. > >> On another note, a warning: I'm likely to change the AsyncOutputStream >> interface significantly in the future, in order to replace >> `shutdownWrite()` with something that returns a promise, and to make it so >> that if you don't explicitly shut down a stream, then it's treated as an >> error. Currently, AsyncOutputStream's destructor implicitly sends a clean >> EOF, but that's the wrong thing to do when the sender terminated >> prematurely due to an exception. So, your code will need some updating when >> that happens. >> > > Yeah, as long as it's a strict compilation error if there's changes that > need to be made to make previously correct work correct again, that's fine > (+ maybe documentation in the release notes on how to migrate). Can just > treat that as the cost of updating to a new version of the library. More > challenging would be any implicit behavioural changes that don't change the > API. Those can be harder to catch even with unit tests. > > Also while you're at it, does it make sense to have 1 function that does > both shutdownWrite/abortRead so that if both are returning futures, joining > them correctly can be hidden as an implementation detail? I'm finding the > promises stuff to be straightforward for simple cases but anything more > custom is harder to reason about in terms of making sure they compose > correctly - I never feel quite comfortable if I've written the promise code > correctly (especially with all the different types of promises, Canceler & > TaskSet). Hopefully it's easy to integrate C++20 coroutine support to make > things read more linearly again. Not sure when I'll get to use C++20 though. > > >> >> -Kenton >> >> On Sun, Jul 5, 2020 at 8:13 PM Vitali Lovich <[email protected]> wrote: >> >>> I was wondering what would be the best way to bridge 2 >>> kj::AsyncIoStreams to each other (read to write/write to read) so that they >>> act as pass-through? I'm assuming something like: >>> >>> auto pumped = kj::ArrayBuilder<kj::Promise<void>>(2); >>> pumped.add(stream1.pumpTo(stream2).ignoreResult()); >>> pumped.add(stream2.pumpTo(stream1).ignoreResult()); >>> return kj::joinPromises(pumped.finish()).ignoreResult(); >>> >>> Thanks, >>> Vitali >>> >>> -- >>> You received this message because you are subscribed to the Google >>> Groups "Cap'n Proto" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> To view this discussion on the web visit >>> https://groups.google.com/d/msgid/capnproto/CAF8PYMh%3DKr9Yzmz9on4Cxprb0irNOGpV0MUtBxdGitbOgkjiEg%40mail.gmail.com >>> <https://groups.google.com/d/msgid/capnproto/CAF8PYMh%3DKr9Yzmz9on4Cxprb0irNOGpV0MUtBxdGitbOgkjiEg%40mail.gmail.com?utm_medium=email&utm_source=footer> >>> . >>> >> -- You received this message because you are subscribed to the Google Groups "Cap'n Proto" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/capnproto/CAF8PYMiXkZ9j8Z%2BKQ%2BnspQzLeaMBdYSDiwhNUW0ZYhpOhKme7g%40mail.gmail.com.
