Hmm I don't understand what you're doing with paf.promise.fork(), it looks like you're merely attaching it to another promise, not actually waiting on it or anything.
I am a little concerned about your use of shutdownWrite() / abortRead() in a way that assumes these functions will cause any concurrent write or read to throw an exception. That actually isn't quite the intent here. These methods are intended to inform the *other side* of the connection that the connection has ended; the weren't designed to cancel operations on your own end. I think that the way they are implemented for native streams today might produce the behavior you want mostly by accident. The "proper" thing to do would be to cancel any concurrent operation first by destroying the associated promise. So before calling abortRead(), make sure to destroy any promises that represent a read() operation. Unfortunately, this brings us back to the problem with joinPromises(). What you really want is for joinPromises() to fail fast if either branch fails, but otherwise wait for both to finish. I guess one rather-ugly way you could do that today is to create a PromiseAndFulfiller for cancellation purposes. Use joinPromises().exclusiveJoin(paf.promise). Then reject the fulfiller if you catch an exception on either side. But that's pretty ugly. We need a joinFailfast(), I think... -Kenton On Mon, Jul 6, 2020 at 10:13 AM Vitali Lovich <[email protected]> wrote: > > > On Mon, Jul 6, 2020 at 7:41 AM Vitali Lovich <[email protected]> wrote: > >> >> On Mon, Jul 6, 2020 at 7:26 AM Kenton Varda <[email protected]> >> wrote: >> >>> Almost. Two issues I can think of: >>> >>> 1. After each pump completes, you probably want to call shutdownWrite() >>> to propagate the EOF. >>> 2. `joinPromises()` always waits for all promises to resolve. I've often >>> found that this turns out to be the wrong behavior when one of the joined >>> promises throws an exception. Usually you want the other tasks canceled in >>> that case. I think that might be the case here -- if you get an I/O error >>> in one direction, you probably want to kill the whole stream. Then again, >>> probably that'll happen anyway in most cases. (Whereas, EOF is not an >>> error, so you do want to wait for the other promise in that case.) >>> >> >> So more like this? >> >> return stream1.pumpTo(stream2).ignoreResult().then( >> [&] {stream2.shutdownWrite()}, >> [&](kj::Exception&& e){ >> stream1.shutdownWrite(); >> stream1.abortRead(); >> stream2.shutdownWrite(); >> stream2.abortRead(); >> })).exclusiveJoin( >> stream2.pumpTo(stream1).ignoreResult().then( >> [&] {stream1.shutdownWrite()}, >> [&](kj::Exception&& e){ >> stream1.shutdownWrite(); >> stream1.abortRead(); >> stream2.shutdownWrite(); >> stream2.abortRead(); >> })); >> > Actually, I think this is two hand-wavy. Also I think the original > inclusive join is actually correct because I want to ensure that both sides > finish any I/O that may be in flight. Otherwise I may end the stream > prematurely just because 1 end finished (e.g. 1 end sends some data & then > closes because its done - the peer won't receive all the data). > > My current code looks something like: > > void completelyClose(kj::AsyncIoStream& stream) { > stream.shutdownWrite(); > stream.abortRead(); > }; > > kj::Canceler stillRunning; > auto stream1 = ioContext.lowLevelProvider->wrapSocketFd( > rawSocket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); > > stillRunning.wrap(listener->accept().then([&](kj::Own<kj::AsyncIoStream>&& > stream2) mutable { > auto paf = kj::newPromiseAndFulfiller<void>(); > auto unsafeStream2 = stream2.get(); > > kj::Vector<kj::Promise<void>> pumped; > pumped.add(stream1->pumpTo(stream2) > .ignoreResult() > .then( > [stream2 = stream2.get()] { stream2->shutdownWrite(); }, > [&stream1, stream2 = stream2.get()] (kj::Exception&& e) { > completelyClose(*stream1); > completelyClose(*stream2); > }); > pumped.add(unsafeStream2->pumpTo(stream1) > .ignoreResult() > .then( > [&stream1] { stream1->shutdownWrite(); }, > [&stream1, stream2 = unsafeStream2] (kj::Exception&& e) { > completelyClose(*stream1); > completelyClose(*stream2); > })); > return kj::joinPromises(pumped.releaseAsArray()) > .attach( > paf.promise.fork(), > // AcceptedConnection simply fulfills on destruction. > kj::heap<AcceptedConnection>(kj::mv(stream2), kj::mv(paf.fulfiller)), > ); > })).wait(waitScope); > > The fulfiller stuff is another place I'm pretty sure I haven't done right. > I was just going off of what's happening under the hood when you wait on > the promise that TwoPartyServer returns when it listens. > > >> >>> On another note, a warning: I'm likely to change the AsyncOutputStream >>> interface significantly in the future, in order to replace >>> `shutdownWrite()` with something that returns a promise, and to make it so >>> that if you don't explicitly shut down a stream, then it's treated as an >>> error. Currently, AsyncOutputStream's destructor implicitly sends a clean >>> EOF, but that's the wrong thing to do when the sender terminated >>> prematurely due to an exception. So, your code will need some updating when >>> that happens. >>> >> >> Yeah, as long as it's a strict compilation error if there's changes that >> need to be made to make previously correct work correct again, that's fine >> (+ maybe documentation in the release notes on how to migrate). Can just >> treat that as the cost of updating to a new version of the library. More >> challenging would be any implicit behavioural changes that don't change the >> API. Those can be harder to catch even with unit tests. >> >> Also while you're at it, does it make sense to have 1 function that does >> both shutdownWrite/abortRead so that if both are returning futures, joining >> them correctly can be hidden as an implementation detail? I'm finding the >> promises stuff to be straightforward for simple cases but anything more >> custom is harder to reason about in terms of making sure they compose >> correctly - I never feel quite comfortable if I've written the promise code >> correctly (especially with all the different types of promises, Canceler & >> TaskSet). Hopefully it's easy to integrate C++20 coroutine support to make >> things read more linearly again. Not sure when I'll get to use C++20 though. >> >> >>> >>> -Kenton >>> >>> On Sun, Jul 5, 2020 at 8:13 PM Vitali Lovich <[email protected]> wrote: >>> >>>> I was wondering what would be the best way to bridge 2 >>>> kj::AsyncIoStreams to each other (read to write/write to read) so that they >>>> act as pass-through? I'm assuming something like: >>>> >>>> auto pumped = kj::ArrayBuilder<kj::Promise<void>>(2); >>>> pumped.add(stream1.pumpTo(stream2).ignoreResult()); >>>> pumped.add(stream2.pumpTo(stream1).ignoreResult()); >>>> return kj::joinPromises(pumped.finish()).ignoreResult(); >>>> >>>> Thanks, >>>> Vitali >>>> >>>> -- >>>> You received this message because you are subscribed to the Google >>>> Groups "Cap'n Proto" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to [email protected]. >>>> To view this discussion on the web visit >>>> https://groups.google.com/d/msgid/capnproto/CAF8PYMh%3DKr9Yzmz9on4Cxprb0irNOGpV0MUtBxdGitbOgkjiEg%40mail.gmail.com >>>> <https://groups.google.com/d/msgid/capnproto/CAF8PYMh%3DKr9Yzmz9on4Cxprb0irNOGpV0MUtBxdGitbOgkjiEg%40mail.gmail.com?utm_medium=email&utm_source=footer> >>>> . >>>> >>> -- You received this message because you are subscribed to the Google Groups "Cap'n Proto" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/capnproto/CAJouXQmCfi3j6bmvU%2Br9OMnkFQnzMr1N4SNeuMsxw-q_2Bv_TA%40mail.gmail.com.
