[ 
https://issues.apache.org/jira/browse/HTTPCLIENT-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590695#comment-16590695
 ] 

ASF GitHub Bot commented on HTTPCLIENT-1942:
--------------------------------------------

Github user rschmitt commented on a diff in the pull request:

    https://github.com/apache/httpcomponents-core/pull/74#discussion_r212421284
  
    --- Diff: 
httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
 ---
    @@ -0,0 +1,176 @@
    +/*
    + * ====================================================================
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + * ====================================================================
    + *
    + * This software consists of voluntary contributions made by many
    + * individuals on behalf of the Apache Software Foundation.  For more
    + * information on the Apache Software Foundation, please see
    + * <http://www.apache.org/>.
    + *
    + */
    +package org.apache.hc.core5.reactive;
    +
    +import org.apache.hc.core5.annotation.Contract;
    +import org.apache.hc.core5.annotation.ThreadingBehavior;
    +import org.apache.hc.core5.concurrent.BasicFuture;
    +import org.apache.hc.core5.concurrent.FutureCallback;
    +import org.apache.hc.core5.http.EntityDetails;
    +import org.apache.hc.core5.http.Header;
    +import org.apache.hc.core5.http.HttpResponse;
    +import org.apache.hc.core5.http.Message;
    +import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
    +import org.apache.hc.core5.http.nio.CapacityChannel;
    +import org.apache.hc.core5.http.protocol.HttpContext;
    +import org.apache.hc.core5.util.Args;
    +import org.reactivestreams.Publisher;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.concurrent.Future;
    +
    +/**
    + * An {@link AsyncResponseConsumer} that publishes the response body 
through
    + * a {@link Publisher}, as defined by the Reactive Streams specification. 
The
    + * response is represented as a {@link Message} consisting of a {@link
    + * HttpResponse} representing the headers and a {@link Publisher} 
representing
    + * the response body as an asynchronous stream of {@link ByteBuffer} 
instances.
    + *
    + * @since 5.0
    + */
    +@Contract(threading = ThreadingBehavior.SAFE)
    +public final class ReactiveResponseConsumer implements 
AsyncResponseConsumer<Void> {
    +
    +    private final ReactiveDataConsumer reactiveDataConsumer = new 
ReactiveDataConsumer();
    +    private final List<Header> trailers = Collections.synchronizedList(new 
ArrayList<Header>());
    +    private final BasicFuture<Message<HttpResponse, 
Publisher<ByteBuffer>>> responseFuture;
    +
    +    private volatile BasicFuture<Void> responseCompletion;
    +    private volatile HttpResponse informationResponse;
    +    private volatile EntityDetails entityDetails;
    +
    +    /**
    +     * Creates a {@code ReactiveResponseConsumer}.
    +     */
    +    public ReactiveResponseConsumer() {
    +        this.responseFuture = new BasicFuture<>(null);
    +    }
    +
    +    /**
    +     * Creates a {@code ReactiveResponseConsumer} that will call back the 
supplied {@link FutureCallback} with a
    +     * streamable response.
    +     *
    +     * @param responseCallback the callback to invoke when the response is 
available for consumption.
    +     */
    +    public ReactiveResponseConsumer(final 
FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> responseCallback) {
    +        this.responseFuture = new 
BasicFuture<>(Args.notNull(responseCallback, "responseCallback"));
    +    }
    +
    +    public Future<Message<HttpResponse, Publisher<ByteBuffer>>> 
getResponseFuture() {
    +        return responseFuture;
    +    }
    +
    +    /**
    +     * Returns the intermediate (1xx) HTTP response if one was received.
    +     *
    +     * @return the information response, or {@code null} if none.
    +     */
    +    public HttpResponse getInformationResponse() {
    +        return informationResponse;
    +    }
    +
    +    /**
    +     * Returns the response entity details.
    +     *
    +     * @return the entity details, or {@code null} if none.
    +     */
    +    public EntityDetails getEntityDetails() {
    +        return entityDetails;
    +    }
    +
    +    /**
    +     * Returns the trailers received at the end of the response.
    +     *
    +     * @return a non-null list of zero or more trailers.
    +     */
    +    public List<Header> getTrailers() {
    +        return trailers;
    +    }
    +
    +    @Override
    +    public void consumeResponse(
    +        final HttpResponse response,
    +        final EntityDetails entityDetails,
    +        final HttpContext httpContext,
    +        final FutureCallback<Void> resultCallback
    +    ) {
    +        this.entityDetails = entityDetails;
    +        this.responseCompletion = new BasicFuture<>(resultCallback);
    +        this.responseFuture.completed(new Message<HttpResponse, 
Publisher<ByteBuffer>>(response, reactiveDataConsumer));
    --- End diff --
    
    I'm not sure what you mean by `Consumer`, because I thought this entire 
project was targeting JDK7. (RxJava provides a `Consumer<T>` interface, but the 
Reactive Streams API does not.) Either way, I would argue that we *do* need to 
return a `Future`, for the same reason that the `#execute` method returns a 
`Future`: callers need a single thing that they can block on (or decorate with 
callback logic) to wait for success, failure, or cancellation.
    
    In the integration tests, there is a fairly consistent idiom:
    
    ```java
            final ReactiveResponseConsumer consumer = new 
ReactiveResponseConsumer();
            requester.execute(request, consumer, Timeout.ofSeconds(2), null);
            final Message<HttpResponse, Publisher<ByteBuffer>> response = 
consumer.getResponseFuture().get();
    ```
    
    With a `Consumer`-based API, a caller might write:
    
    ```java
            final BlockingQueue<Message<HttpResponse, Publisher<ByteBuffer>>> 
streamingResponse =
                new ArrayBlockingQueue<>(1);
            final ReactiveResponseConsumer consumer =
                new ReactiveResponseConsumer(new Consumer<Message<HttpResponse, 
Publisher<ByteBuffer>>>() {
                    @Override
                    public void accept(final Message<HttpResponse, 
Publisher<ByteBuffer>> httpResponsePublisherMessage) {
                        streamingResponse.offer(httpResponsePublisherMessage);
                    }
                });
            requester.execute(request, consumer, Timeout.ofSeconds(2), null);
            final Message<HttpResponse, Publisher<ByteBuffer>> response = 
streamingResponse.take();
    ```
    
    However, the above code will block forever if an exception occurs and the 
response fails to come back. One way to fix this is by setting up a 
`CompletableFuture` or `BasicFuture` and then completing it from *both* 
callbacks:
    
    ```java
            final CompletableFuture<Message<HttpResponse, 
Publisher<ByteBuffer>>> streamingResponse =
                new CompletableFuture<>();
            final ReactiveResponseConsumer consumer =
                new ReactiveResponseConsumer(new Consumer<Message<HttpResponse, 
Publisher<ByteBuffer>>>() {
                    @Override
                    public void accept(final Message<HttpResponse, 
Publisher<ByteBuffer>> httpResponsePublisherMessage) {
                        
streamingResponse.complete(httpResponsePublisherMessage);
                    }
                });
            requester.execute(request, consumer, Timeout.ofSeconds(2), new 
FutureCallback<Void>() {
                @Override
                public void completed(Void result) {
                    // TODO: Can this race with the other callback?
                    streamingResponse.completeExceptionally(new 
RuntimeException("No stream returned"));
                }
    
                @Override
                public void failed(Exception ex) {
                    streamingResponse.completeExceptionally(ex);
                }
    
                @Override
                public void cancelled() {
                    streamingResponse.completeExceptionally(new 
CancellationException());
                }
            });
            final Message<HttpResponse, Publisher<ByteBuffer>> response = 
streamingResponse.join();
    ```
    
    In short, constructing a correctly functioning `Future` for the streaming 
response is definitely something I want to do on the caller's behalf, rather 
than asking the caller to construct one manually from lower-level primitives.


> Add support for Reactive Streams
> --------------------------------
>
>                 Key: HTTPCLIENT-1942
>                 URL: https://issues.apache.org/jira/browse/HTTPCLIENT-1942
>             Project: HttpComponents HttpClient
>          Issue Type: Wish
>          Components: HttpClient (async)
>    Affects Versions: 5.0 Beta1
>            Reporter: Ryan Schmitt
>            Priority: Major
>              Labels: stuck, volunteers-wanted
>             Fix For: Future
>
>
> It would be very helpful to me if the Apache client provided an 
> implementation of the [Reactive Streams|http://www.reactive-streams.org/] 
> spec, particularly as an implementation of the standard 
> [interfaces|https://search.maven.org/artifact/org.reactivestreams/reactive-streams/1.0.2/jar].
>  These interfaces are JDK6-compatible and have no other dependencies, but 
> they unlock interoperability with many other frameworks, such as RxJava.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to