[
https://issues.apache.org/jira/browse/HTTPCLIENT-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16594637#comment-16594637
]
ASF GitHub Bot commented on HTTPCLIENT-1942:
--------------------------------------------
Github user ok2c commented on a diff in the pull request:
https://github.com/apache/httpcomponents-core/pull/74#discussion_r213206278
--- Diff:
httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
---
@@ -0,0 +1,175 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.ProtocolException;
+import org.apache.hc.core5.http.nio.AsyncDataProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.util.Args;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An asynchronous data producer that supports Reactive Streams.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+final class ReactiveDataProducer implements AsyncDataProducer,
Subscriber<ByteBuffer> {
+
+ private static final int BUFFER_WINDOW_SIZE = 5;
+
+ private final AtomicReference<DataStreamChannel> requestChannel = new
AtomicReference<>();
+ private final AtomicReference<Throwable> exception = new
AtomicReference<>(null);
+ private final AtomicBoolean complete = new AtomicBoolean(false);
+ private final Publisher<ByteBuffer> publisher;
+ private final AtomicReference<Subscription> subscription = new
AtomicReference<>(null);
+ private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); //
This field requires synchronization
+
+ public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
+ this.publisher = Args.notNull(publisher, "publisher");
+ }
+
+ @Override
+ public void onSubscribe(final Subscription subscription) {
+ if (this.subscription.getAndSet(subscription) != null) {
+ throw new IllegalStateException("Already subscribed");
+ }
+
+ subscription.request(BUFFER_WINDOW_SIZE);
+ }
+
+ @Override
+ public void onNext(final ByteBuffer byteBuffer) {
+ final byte[] copy = new byte[byteBuffer.remaining()];
+ byteBuffer.get(copy);
+ synchronized (buffers) {
+ buffers.add(ByteBuffer.wrap(copy));
+ }
+ signalReadiness();
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ subscription.set(null);
+ exception.set(throwable);
+ signalReadiness();
+ }
+
+ @Override
+ public void onComplete() {
+ subscription.set(null);
+ complete.set(true);
+ signalReadiness();
+ }
+
+ private void signalReadiness() {
+ final DataStreamChannel channel = requestChannel.get();
+ if (channel == null) {
+ throw new IllegalStateException("Output channel is not set");
+ }
+ channel.requestOutput();
+ }
+
+ @Override
+ public int available() {
+ if (exception.get() != null || complete.get()) {
+ return 1;
+ } else {
+ synchronized (buffers) {
+ int sum = 0;
+ for (final ByteBuffer buffer : buffers) {
+ sum += buffer.remaining();
+ }
+ return sum;
+ }
+ }
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws
IOException {
+ if (requestChannel.get() == null) {
+ requestChannel.set(channel);
+ publisher.subscribe(this);
+ }
+
+ final Throwable t = exception.get();
+ final Subscription s = subscription.get();
+ int buffersToReplenish = 0;
+ try {
+ synchronized (buffers) {
+ if (t != null) {
+ // TODO: We need a reliable way to send RST_STREAM
(without subsequent GOAWAY) on HTTP/2.
--- End diff --
@rschmitt I'll refactor that bit once the change-set gets merged.
> Add support for Reactive Streams
> --------------------------------
>
> Key: HTTPCLIENT-1942
> URL: https://issues.apache.org/jira/browse/HTTPCLIENT-1942
> Project: HttpComponents HttpClient
> Issue Type: Wish
> Components: HttpClient (async)
> Affects Versions: 5.0 Beta1
> Reporter: Ryan Schmitt
> Priority: Major
> Labels: stuck, volunteers-wanted
> Fix For: Future
>
>
> It would be very helpful to me if the Apache client provided an
> implementation of the [Reactive Streams|http://www.reactive-streams.org/]
> spec, particularly as an implementation of the standard
> [interfaces|https://search.maven.org/artifact/org.reactivestreams/reactive-streams/1.0.2/jar].
> These interfaces are JDK6-compatible and have no other dependencies, but
> they unlock interoperability with many other frameworks, such as RxJava.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]