On Mon, Jul 6, 2020 at 7:41 AM Vitali Lovich <[email protected]> wrote:

>
> On Mon, Jul 6, 2020 at 7:26 AM Kenton Varda <[email protected]> wrote:
>
>> Almost. Two issues I can think of:
>>
>> 1. After each pump completes, you probably want to call shutdownWrite()
>> to propagate the EOF.
>> 2. `joinPromises()` always waits for all promises to resolve. I've often
>> found that this turns out to be the wrong behavior when one of the joined
>> promises throws an exception. Usually you want the other tasks canceled in
>> that case. I think that might be the case here -- if you get an I/O error
>> in one direction, you probably want to kill the whole stream. Then again,
>> probably that'll happen anyway in most cases. (Whereas, EOF is not an
>> error, so you do want to wait for the other promise in that case.)
>>
>
> So more like this?
>
> return stream1.pumpTo(stream2).ignoreResult().then(
>   [&] {stream2.shutdownWrite()},
>   [&](kj::Exception&& e){
>     stream1.shutdownWrite();
>     stream1.abortRead();
>     stream2.shutdownWrite();
>     stream2.abortRead();
>   })).exclusiveJoin(
>     stream2.pumpTo(stream1).ignoreResult().then(
>       [&] {stream1.shutdownWrite()},
>       [&](kj::Exception&& e){
>         stream1.shutdownWrite();
>         stream1.abortRead();
>         stream2.shutdownWrite();
>         stream2.abortRead();
>       }));
>
Actually, I think this is two hand-wavy. Also I think the original
inclusive join is actually correct because I want to ensure that both sides
finish any I/O that may be in flight. Otherwise I may end the stream
prematurely just because 1 end finished (e.g. 1 end sends some data & then
closes because its done - the peer won't receive all the data).

My current code looks something like:

void completelyClose(kj::AsyncIoStream& stream) {
    stream.shutdownWrite();
    stream.abortRead();
};

kj::Canceler stillRunning;
auto stream1 = ioContext.lowLevelProvider->wrapSocketFd(
            rawSocket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);

stillRunning.wrap(listener->accept().then([&](kj::Own<kj::AsyncIoStream>&&
stream2) mutable {
  auto paf = kj::newPromiseAndFulfiller<void>();
  auto unsafeStream2 = stream2.get();

  kj::Vector<kj::Promise<void>> pumped;
  pumped.add(stream1->pumpTo(stream2)
    .ignoreResult()
    .then(
      [stream2 = stream2.get()] { stream2->shutdownWrite(); },
      [&stream1, stream2 = stream2.get()] (kj::Exception&& e) {
        completelyClose(*stream1);
        completelyClose(*stream2);
      });
  pumped.add(unsafeStream2->pumpTo(stream1)
    .ignoreResult()
    .then(
      [&stream1] { stream1->shutdownWrite(); },
      [&stream1, stream2 = unsafeStream2] (kj::Exception&& e) {
        completelyClose(*stream1);
              completelyClose(*stream2);
      }));
  return kj::joinPromises(pumped.releaseAsArray())
    .attach(
      paf.promise.fork(),
      // AcceptedConnection simply fulfills on destruction.
      kj::heap<AcceptedConnection>(kj::mv(stream2), kj::mv(paf.fulfiller)),
    );
})).wait(waitScope);

The fulfiller stuff is another place I'm pretty sure I haven't done right.
I was just going off of what's happening under the hood when you wait on
the promise that TwoPartyServer returns when it listens.


>
>> On another note, a warning: I'm likely to change the AsyncOutputStream
>> interface significantly in the future, in order to replace
>> `shutdownWrite()` with something that returns a promise, and to make it so
>> that if you don't explicitly shut down a stream, then it's treated as an
>> error. Currently, AsyncOutputStream's destructor implicitly sends a clean
>> EOF, but that's the wrong thing to do when the sender terminated
>> prematurely due to an exception. So, your code will need some updating when
>> that happens.
>>
>
> Yeah, as long as it's a strict compilation error if there's changes that
> need to be made to make previously correct work correct again, that's fine
> (+ maybe documentation in the release notes on how to migrate). Can just
> treat that as the cost of updating to a new version of the library. More
> challenging would be any implicit behavioural changes that don't change the
> API. Those can be harder to catch even with unit tests.
>
> Also while you're at it, does it make sense to have 1 function that does
> both shutdownWrite/abortRead so that if both are returning futures, joining
> them correctly can be hidden as an implementation detail? I'm finding the
> promises stuff to be straightforward for simple cases but anything more
> custom is harder to reason about in terms of making sure they compose
> correctly - I never feel quite comfortable if I've written the promise code
> correctly (especially with all the different types of promises, Canceler &
> TaskSet). Hopefully it's easy to integrate C++20 coroutine support to make
> things read more linearly again. Not sure when I'll get to use C++20 though.
>
>
>>
>> -Kenton
>>
>> On Sun, Jul 5, 2020 at 8:13 PM Vitali Lovich <[email protected]> wrote:
>>
>>> I was wondering what would be the best way to bridge 2
>>> kj::AsyncIoStreams to each other (read to write/write to read) so that they
>>> act as pass-through? I'm assuming something like:
>>>
>>> auto pumped = kj::ArrayBuilder<kj::Promise<void>>(2);
>>> pumped.add(stream1.pumpTo(stream2).ignoreResult());
>>> pumped.add(stream2.pumpTo(stream1).ignoreResult());
>>> return kj::joinPromises(pumped.finish()).ignoreResult();
>>>
>>> Thanks,
>>> Vitali
>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "Cap'n Proto" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To view this discussion on the web visit
>>> https://groups.google.com/d/msgid/capnproto/CAF8PYMh%3DKr9Yzmz9on4Cxprb0irNOGpV0MUtBxdGitbOgkjiEg%40mail.gmail.com
>>> <https://groups.google.com/d/msgid/capnproto/CAF8PYMh%3DKr9Yzmz9on4Cxprb0irNOGpV0MUtBxdGitbOgkjiEg%40mail.gmail.com?utm_medium=email&utm_source=footer>
>>> .
>>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/CAF8PYMiXkZ9j8Z%2BKQ%2BnspQzLeaMBdYSDiwhNUW0ZYhpOhKme7g%40mail.gmail.com.

Reply via email to