On Wednesday, January 3, 2018 at 7:32:42 PM UTC-5, Matt Mitchell wrote:
>
> The way my service works is the client will establish a bidi stream. The
> server then sends a message, from which the client will send n-messages
> back.
>

So the "starter" code to that (which we'll need to improve), I'd expect to
be closer to:
public void onNext(MyMessage msgFromServer) {
  for (MyMessage msgToServer : process(msgFromServer)) {
    while(!requestStream.isReady()) {sleep();}
    requestStream.onNext(msg);
  }
  // Doesn't actually do much; should just use autoInboundFlowControl
  requestStream.request(1);
}

Disabling auto inbound flow control (which I'm assuming is done because the
request() is there) doesn't do anything here since the automatic inbound
flow control calls request(1) when onNext returns (thus, you've manually
implemented the "automatic" flow control).

Note that this could also be okay if you have all the response messages
already (you can't push-back on the message generation):
public void onNext(MyMessage msgFromServer) {
  List<MyMessage> msgs = process(msgFromServer);
  for (MyMessage msgToServer : msgs) {
    requestStream.onNext(msg);
  }
  while(!requestStream.isReady()) {sleep();}
  // Doesn't actually do much; should just use autoInboundFlowControl
  requestStream.request(1);
}

If you already have the message, I tend to recommend you go ahead and send
it. There's some exceptions to that, but they get special-case-y. The point
of isReady/onReady is to slow down the message producer, so if they are
already produced might as well enqueue them...

On Fri, Jan 26, 2018 at 10:04 AM, <[email protected]> wrote:

> Since my client sends back a stream of results for each server request
> message, and the setOnReadyHandler call and onNext are on the same thread
>

The answer is to return from onNext so you can receive the onReady
callbacks. Yes, that can be a pain, but it is an async API. (We're looking
into making a blocking streaming API, which would make this sort of thing
much easier. But such a thing requires effort to avoid deadlock.)

So let's look at the case where the client generates the messages
all-at-once:
stub.someMethod(new ClientResponseObserver<MyReq, MyResp>() {
  private ClientCallStreamObserver<MyReq> requestStream;
  private boolean needRequest;

  @Override public void beforeStart(
      ClientCallStreamObserver<MyReq> requestStream) {
    this.requestStream = requestStream;
    requestStream.disableAutoInboundFlowControl();
    requestStream.setOnReadyHandler(() -> onReady());
  }

  private void onReady() {
    if (requestStream.isReady() && needRequest) {
      requestStream.request(1);
      needRequest = false;
    }
  }

  @Override public void onNext(MyResp msgFromServer) {
    // We can request messages from the server when flow control is ready,
so it
    // should be ready now.
    List<MyMessage> msgs = process(msgFromServer);
    for (MyMessage msgToServer : msgs) {
      requestStream.onNext(msg);
    }
    if (requestStream.isReady()) {
      requestStream.request(1);
    } else {
      needRequest = true;
    }
  }
});

And now for further optimizing the client to avoid generating messages
until we can send them:
stub.someMethod(new ClientResponseObserver<MyReq, MyResp>() {
  private ClientCallStreamObserver<MyReq> requestStream;
  private Iterator<MyReq> requests;

  @Override public void beforeStart(
      ClientCallStreamObserver<MyReq> requestStream) {
    this.requestStream = requestStream;
    requestStream.disableAutoInboundFlowControl();
    requestStream.setOnReadyHandler(() -> onReady());
  }

  private void onReady() {
    if (requests == null) {
      return;
    }
    while (requestStream.isReady() && requests.hasNext()) {
      // We only generate a message if we can send it
      MyReq msg = requests.next();
      requestStream.onNext(msg);
    }
    if (!requests.hasNext()) {
      requests = null;
      requestStream.request(1);
    }
  }

  @Override public void onNext(MyResp msgFromServer) {
    assert requests == null;
    requests = process(msgFromServer);
    onReady(); // no-op if not ready
  }
});

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/CA%2B4M1oMz%3D0jhhtKdn%3DxE9mYHO8cv3PO%2BxXL-1KGX%2BQUOfJfq6w%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to