[ 
https://issues.apache.org/jira/browse/HTTPCLIENT-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16594186#comment-16594186
 ] 

ASF GitHub Bot commented on HTTPCLIENT-1942:
--------------------------------------------

Github user rschmitt commented on a diff in the pull request:

    https://github.com/apache/httpcomponents-core/pull/74#discussion_r213104118
  
    --- Diff: 
httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
 ---
    @@ -0,0 +1,175 @@
    +/*
    + * ====================================================================
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + * ====================================================================
    + *
    + * This software consists of voluntary contributions made by many
    + * individuals on behalf of the Apache Software Foundation.  For more
    + * information on the Apache Software Foundation, please see
    + * <http://www.apache.org/>.
    + *
    + */
    +package org.apache.hc.core5.reactive;
    +
    +import org.apache.hc.core5.annotation.Contract;
    +import org.apache.hc.core5.annotation.ThreadingBehavior;
    +import org.apache.hc.core5.http.ProtocolException;
    +import org.apache.hc.core5.http.nio.AsyncDataProducer;
    +import org.apache.hc.core5.http.nio.DataStreamChannel;
    +import org.apache.hc.core5.util.Args;
    +import org.reactivestreams.Publisher;
    +import org.reactivestreams.Subscriber;
    +import org.reactivestreams.Subscription;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayDeque;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * An asynchronous data producer that supports Reactive Streams.
    + *
    + * @since 5.0
    + */
    +@Contract(threading = ThreadingBehavior.SAFE)
    +final class ReactiveDataProducer implements AsyncDataProducer, 
Subscriber<ByteBuffer> {
    +
    +    private static final int BUFFER_WINDOW_SIZE = 5;
    +
    +    private final AtomicReference<DataStreamChannel> requestChannel = new 
AtomicReference<>();
    +    private final AtomicReference<Throwable> exception = new 
AtomicReference<>(null);
    +    private final AtomicBoolean complete = new AtomicBoolean(false);
    +    private final Publisher<ByteBuffer> publisher;
    +    private final AtomicReference<Subscription> subscription = new 
AtomicReference<>(null);
    +    private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); // 
This field requires synchronization
    +
    +    public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
    +        this.publisher = Args.notNull(publisher, "publisher");
    +    }
    +
    +    @Override
    +    public void onSubscribe(final Subscription subscription) {
    +        if (this.subscription.getAndSet(subscription) != null) {
    +            throw new IllegalStateException("Already subscribed");
    +        }
    +
    +        subscription.request(BUFFER_WINDOW_SIZE);
    +    }
    +
    +    @Override
    +    public void onNext(final ByteBuffer byteBuffer) {
    +        final byte[] copy = new byte[byteBuffer.remaining()];
    +        byteBuffer.get(copy);
    +        synchronized (buffers) {
    +            buffers.add(ByteBuffer.wrap(copy));
    +        }
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onError(final Throwable throwable) {
    +        subscription.set(null);
    +        exception.set(throwable);
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onComplete() {
    +        subscription.set(null);
    +        complete.set(true);
    +        signalReadiness();
    +    }
    +
    +    private void signalReadiness() {
    +        final DataStreamChannel channel = requestChannel.get();
    +        if (channel == null) {
    +            throw new IllegalStateException("Output channel is not set");
    +        }
    +        channel.requestOutput();
    +    }
    +
    +    @Override
    +    public int available() {
    +        if (exception.get() != null || complete.get()) {
    +            return 1;
    +        } else {
    +            synchronized (buffers) {
    +                int sum = 0;
    +                for (final ByteBuffer buffer : buffers) {
    +                    sum += buffer.remaining();
    +                }
    +                return sum;
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void produce(final DataStreamChannel channel) throws 
IOException {
    +        if (requestChannel.get() == null) {
    +            requestChannel.set(channel);
    +            publisher.subscribe(this);
    +        }
    +
    +        final Throwable t = exception.get();
    +        final Subscription s = subscription.get();
    +        int buffersToReplenish = 0;
    +        try {
    +            synchronized (buffers) {
    +                if (t != null) {
    +                    // TODO: We need a reliable way to send RST_STREAM 
(without subsequent GOAWAY) on HTTP/2.
    --- End diff --
    
    `H2StreamResetException` works perfectly, except for the additional 
dependency edge from `:httpcore5-reactive` to `:httpcore5-h2`, which I think is 
acceptable.


> Add support for Reactive Streams
> --------------------------------
>
>                 Key: HTTPCLIENT-1942
>                 URL: https://issues.apache.org/jira/browse/HTTPCLIENT-1942
>             Project: HttpComponents HttpClient
>          Issue Type: Wish
>          Components: HttpClient (async)
>    Affects Versions: 5.0 Beta1
>            Reporter: Ryan Schmitt
>            Priority: Major
>              Labels: stuck, volunteers-wanted
>             Fix For: Future
>
>
> It would be very helpful to me if the Apache client provided an 
> implementation of the [Reactive Streams|http://www.reactive-streams.org/] 
> spec, particularly as an implementation of the standard 
> [interfaces|https://search.maven.org/artifact/org.reactivestreams/reactive-streams/1.0.2/jar].
>  These interfaces are JDK6-compatible and have no other dependencies, but 
> they unlock interoperability with many other frameworks, such as RxJava.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to