[
https://issues.apache.org/jira/browse/HTTPCLIENT-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590695#comment-16590695
]
ASF GitHub Bot commented on HTTPCLIENT-1942:
--------------------------------------------
Github user rschmitt commented on a diff in the pull request:
https://github.com/apache/httpcomponents-core/pull/74#discussion_r212421284
--- Diff:
httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
---
@@ -0,0 +1,176 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.Args;
+import org.reactivestreams.Publisher;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+
+/**
+ * An {@link AsyncResponseConsumer} that publishes the response body
through
+ * a {@link Publisher}, as defined by the Reactive Streams specification.
The
+ * response is represented as a {@link Message} consisting of a {@link
+ * HttpResponse} representing the headers and a {@link Publisher}
representing
+ * the response body as an asynchronous stream of {@link ByteBuffer}
instances.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class ReactiveResponseConsumer implements
AsyncResponseConsumer<Void> {
+
+ private final ReactiveDataConsumer reactiveDataConsumer = new
ReactiveDataConsumer();
+ private final List<Header> trailers = Collections.synchronizedList(new
ArrayList<Header>());
+ private final BasicFuture<Message<HttpResponse,
Publisher<ByteBuffer>>> responseFuture;
+
+ private volatile BasicFuture<Void> responseCompletion;
+ private volatile HttpResponse informationResponse;
+ private volatile EntityDetails entityDetails;
+
+ /**
+ * Creates a {@code ReactiveResponseConsumer}.
+ */
+ public ReactiveResponseConsumer() {
+ this.responseFuture = new BasicFuture<>(null);
+ }
+
+ /**
+ * Creates a {@code ReactiveResponseConsumer} that will call back the
supplied {@link FutureCallback} with a
+ * streamable response.
+ *
+ * @param responseCallback the callback to invoke when the response is
available for consumption.
+ */
+ public ReactiveResponseConsumer(final
FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> responseCallback) {
+ this.responseFuture = new
BasicFuture<>(Args.notNull(responseCallback, "responseCallback"));
+ }
+
+ public Future<Message<HttpResponse, Publisher<ByteBuffer>>>
getResponseFuture() {
+ return responseFuture;
+ }
+
+ /**
+ * Returns the intermediate (1xx) HTTP response if one was received.
+ *
+ * @return the information response, or {@code null} if none.
+ */
+ public HttpResponse getInformationResponse() {
+ return informationResponse;
+ }
+
+ /**
+ * Returns the response entity details.
+ *
+ * @return the entity details, or {@code null} if none.
+ */
+ public EntityDetails getEntityDetails() {
+ return entityDetails;
+ }
+
+ /**
+ * Returns the trailers received at the end of the response.
+ *
+ * @return a non-null list of zero or more trailers.
+ */
+ public List<Header> getTrailers() {
+ return trailers;
+ }
+
+ @Override
+ public void consumeResponse(
+ final HttpResponse response,
+ final EntityDetails entityDetails,
+ final HttpContext httpContext,
+ final FutureCallback<Void> resultCallback
+ ) {
+ this.entityDetails = entityDetails;
+ this.responseCompletion = new BasicFuture<>(resultCallback);
+ this.responseFuture.completed(new Message<HttpResponse,
Publisher<ByteBuffer>>(response, reactiveDataConsumer));
--- End diff --
I'm not sure what you mean by `Consumer`, because I thought this entire
project was targeting JDK7. (RxJava provides a `Consumer<T>` interface, but the
Reactive Streams API does not.) Either way, I would argue that we *do* need to
return a `Future`, for the same reason that the `#execute` method returns a
`Future`: callers need a single thing that they can block on (or decorate with
callback logic) to wait for success, failure, or cancellation.
In the integration tests, there is a fairly consistent idiom:
```java
final ReactiveResponseConsumer consumer = new
ReactiveResponseConsumer();
requester.execute(request, consumer, Timeout.ofSeconds(2), null);
final Message<HttpResponse, Publisher<ByteBuffer>> response =
consumer.getResponseFuture().get();
```
With a `Consumer`-based API, a caller might write:
```java
final BlockingQueue<Message<HttpResponse, Publisher<ByteBuffer>>>
streamingResponse =
new ArrayBlockingQueue<>(1);
final ReactiveResponseConsumer consumer =
new ReactiveResponseConsumer(new Consumer<Message<HttpResponse,
Publisher<ByteBuffer>>>() {
@Override
public void accept(final Message<HttpResponse,
Publisher<ByteBuffer>> httpResponsePublisherMessage) {
streamingResponse.offer(httpResponsePublisherMessage);
}
});
requester.execute(request, consumer, Timeout.ofSeconds(2), null);
final Message<HttpResponse, Publisher<ByteBuffer>> response =
streamingResponse.take();
```
However, the above code will block forever if an exception occurs and the
response fails to come back. One way to fix this is by setting up a
`CompletableFuture` or `BasicFuture` and then completing it from *both*
callbacks:
```java
final CompletableFuture<Message<HttpResponse,
Publisher<ByteBuffer>>> streamingResponse =
new CompletableFuture<>();
final ReactiveResponseConsumer consumer =
new ReactiveResponseConsumer(new Consumer<Message<HttpResponse,
Publisher<ByteBuffer>>>() {
@Override
public void accept(final Message<HttpResponse,
Publisher<ByteBuffer>> httpResponsePublisherMessage) {
streamingResponse.complete(httpResponsePublisherMessage);
}
});
requester.execute(request, consumer, Timeout.ofSeconds(2), new
FutureCallback<Void>() {
@Override
public void completed(Void result) {
// TODO: Can this race with the other callback?
streamingResponse.completeExceptionally(new
RuntimeException("No stream returned"));
}
@Override
public void failed(Exception ex) {
streamingResponse.completeExceptionally(ex);
}
@Override
public void cancelled() {
streamingResponse.completeExceptionally(new
CancellationException());
}
});
final Message<HttpResponse, Publisher<ByteBuffer>> response =
streamingResponse.join();
```
In short, constructing a correctly functioning `Future` for the streaming
response is definitely something I want to do on the caller's behalf, rather
than asking the caller to construct one manually from lower-level primitives.
> Add support for Reactive Streams
> --------------------------------
>
> Key: HTTPCLIENT-1942
> URL: https://issues.apache.org/jira/browse/HTTPCLIENT-1942
> Project: HttpComponents HttpClient
> Issue Type: Wish
> Components: HttpClient (async)
> Affects Versions: 5.0 Beta1
> Reporter: Ryan Schmitt
> Priority: Major
> Labels: stuck, volunteers-wanted
> Fix For: Future
>
>
> It would be very helpful to me if the Apache client provided an
> implementation of the [Reactive Streams|http://www.reactive-streams.org/]
> spec, particularly as an implementation of the standard
> [interfaces|https://search.maven.org/artifact/org.reactivestreams/reactive-streams/1.0.2/jar].
> These interfaces are JDK6-compatible and have no other dependencies, but
> they unlock interoperability with many other frameworks, such as RxJava.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]