Repository: cxf Updated Branches: refs/heads/master cf11aa95e -> feba80454
CXF-7085: Introduce support for Server Sent Events (Client). Initial implementation. Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/feba8045 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/feba8045 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/feba8045 Branch: refs/heads/master Commit: feba8045486cd910b238065cbb3ee4a741198d76 Parents: cf11aa9 Author: reta <[email protected]> Authored: Fri Jun 23 17:37:23 2017 -0400 Committer: reta <[email protected]> Committed: Fri Jun 23 17:37:23 2017 -0400 ---------------------------------------------------------------------- parent/pom.xml | 2 +- rt/rs/sse/pom.xml | 7 +- .../jaxrs/sse/client/InboundSseEventImpl.java | 184 ++++++++++++++++++ .../sse/client/InboundSseEventListener.java | 31 +++ .../sse/client/InboundSseEventProcessor.java | 140 ++++++++++++++ .../sse/client/SseEventSourceBuilderImpl.java | 51 +++++ .../jaxrs/sse/client/SseEventSourceImpl.java | 190 +++++++++++++++++++ .../javax.ws.rs.sse.SseEventSource$Builder | 1 + .../systest/jaxrs/sse/AbstractSseBaseTest.java | 23 +++ .../cxf/systest/jaxrs/sse/AbstractSseTest.java | 29 +++ 10 files changed, 656 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index bd95eab..e26fee6 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -111,7 +111,7 @@ <cxf.geronimo.transaction.version>3.1.4</cxf.geronimo.transaction.version> <cxf.jasypt.bundle.version>1.9.0_1</cxf.jasypt.bundle.version> <cxf.javassist.version>3.19.0-GA</cxf.javassist.version> - <cxf.javax.ws.rs.version>2.1-m08</cxf.javax.ws.rs.version> + <cxf.javax.ws.rs.version>2.1-m09</cxf.javax.ws.rs.version> <cxf.jaxb.version>2.2.11</cxf.jaxb.version> <cxf.jaxb.impl.version>${cxf.jaxb.version}</cxf.jaxb.impl.version> <cxf.jaxb.core.version>${cxf.jaxb.version}</cxf.jaxb.core.version> http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/pom.xml ---------------------------------------------------------------------- diff --git a/rt/rs/sse/pom.xml b/rt/rs/sse/pom.xml index d4c9f17..745a493 100644 --- a/rt/rs/sse/pom.xml +++ b/rt/rs/sse/pom.xml @@ -43,6 +43,12 @@ </dependency> <dependency> <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-client</artifactId> + <version>${project.version}</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> <artifactId>cxf-integration-cdi</artifactId> <version>${project.version}</version> <optional>true</optional> @@ -60,7 +66,6 @@ <dependency> <groupId>org.atmosphere</groupId> <artifactId>atmosphere-runtime</artifactId> - <version>${cxf.atmosphere.version}</version> </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java new file mode 100644 index 0000000..4d4eab4 --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventImpl.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.client; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.OptionalLong; +import java.util.logging.Logger; + +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyReader; +import javax.ws.rs.sse.InboundSseEvent; + +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.jaxrs.client.ClientProviderFactory; +import org.apache.cxf.message.Message; + +public class InboundSseEventImpl implements InboundSseEvent { + private final String id; + private final String name; + private final String comment; + private final long reconnectDelay; + private final boolean reconnectDelaySet; + private final String data; + private final ClientProviderFactory factory; + private final Message message; + + static class Builder { + private static final Logger LOG = LogUtils.getL7dLogger(Builder.class); + + private final String name; + private String id; + private String comment; + private OptionalLong reconnectDelay = OptionalLong.empty(); + private String data; + + Builder(String name) { + this.name = name; + } + + Builder id(String id) { + this.id = id; + return this; + } + + Builder comment(String comment) { + this.comment = comment; + return this; + } + + Builder reconnectDelay(String reconnectDelay) { + try { + this.reconnectDelay = OptionalLong.of(Long.parseLong(reconnectDelay)); + } catch (final NumberFormatException ex) { + LOG.warning("Unable to parse reconnectDelay, long number expected: " + ex.getMessage()); + } + + return this; + } + + Builder data(String data) { + this.data = data; + return this; + } + + InboundSseEvent build(ClientProviderFactory factory, Message message) { + return new InboundSseEventImpl(id, name, comment, reconnectDelay.orElse(0), + reconnectDelay.isPresent(), data, factory, message); + } + } + + InboundSseEventImpl(String id, String name, String comment, long reconnectDelay, boolean reconnectDelaySet, + String data, ClientProviderFactory factory, Message message) { + this.id = id; + this.name = name; + this.comment = comment; + this.reconnectDelay = reconnectDelay; + this.reconnectDelaySet = reconnectDelaySet; + this.data = data; + this.factory = factory; + this.message = message; + } + + @Override + public String getId() { + return id; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getComment() { + return comment; + } + + @Override + public long getReconnectDelay() { + return reconnectDelay; + } + + @Override + public boolean isReconnectDelaySet() { + return reconnectDelaySet; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public String readData() { + return data; + } + + @Override + public <T> T readData(Class<T> type) { + return read(type, type, MediaType.WILDCARD_TYPE); + } + + @Override + @SuppressWarnings("unchecked") + public <T> T readData(GenericType<T> type) { + return read((Class<T>)type.getRawType(), type.getType(), MediaType.WILDCARD_TYPE); + } + + @Override + public <T> T readData(Class<T> messageType, MediaType mediaType) { + return read(messageType, messageType, mediaType); + } + + @Override + @SuppressWarnings("unchecked") + public <T> T readData(GenericType<T> type, MediaType mediaType) { + return read((Class<T>)type.getRawType(), type.getType(), mediaType); + } + + private <T> T read(Class<T> messageType, Type type, MediaType mediaType) { + if (data == null) { + return null; + } + + final Annotation[] annotations = new Annotation[0]; + final MultivaluedMap<String, String> headers = new MultivaluedHashMap<>(0); + + final MessageBodyReader<T> reader = factory.createMessageBodyReader(messageType, type, + annotations, mediaType, message); + + if (reader == null) { + throw new RuntimeException("No suitable message body reader for class: " + messageType.getName()); + } + + try (final ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8))) { + return reader.readFrom(messageType, type, annotations, mediaType, headers, is); + } catch (final IOException ex) { + throw new RuntimeException("Unable to read data of type " + messageType.getName(), ex); + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventListener.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventListener.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventListener.java new file mode 100644 index 0000000..b61d51a --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventListener.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.jaxrs.sse.client; + +import javax.ws.rs.sse.InboundSseEvent; + +/** + * Flow/RxJava like listener for processing SSE events + */ +interface InboundSseEventListener { + void onNext(InboundSseEvent event); + void onError(Throwable ex); + void onComplete(); +} http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java new file mode 100644 index 0000000..cf2346a --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/InboundSseEventProcessor.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.client; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.sse.InboundSseEvent; + +import org.apache.cxf.common.util.StringUtils; +import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.jaxrs.client.ClientProviderFactory; +import org.apache.cxf.jaxrs.impl.ResponseImpl; +import org.apache.cxf.message.Message; + +public class InboundSseEventProcessor { + public static final String SERVER_SENT_EVENTS = "text/event-stream"; + public static final MediaType SERVER_SENT_EVENTS_TYPE = MediaType.valueOf(SERVER_SENT_EVENTS); + + private static final String COMMENT = ": "; + private static final String EVENT = " "; + private static final String ID = "id: "; + private static final String RETRY = "retry: "; + private static final String DATA = "data: "; + + private final Endpoint endpoint; + private final InboundSseEventListener listener; + private final ExecutorService executor; + private volatile boolean closed = false; + + protected InboundSseEventProcessor(Endpoint endpoint, InboundSseEventListener listener) { + this.endpoint = endpoint; + this.listener = listener; + this.executor = Executors.newSingleThreadExecutor(); + } + + void run(final Response response) { + final InputStream is = response.readEntity(InputStream.class); + final ClientProviderFactory factory = ClientProviderFactory.getInstance(endpoint); + + Message message = null; + if (response instanceof ResponseImpl) { + message = ((ResponseImpl)response).getOutMessage(); + } + + executor.submit(process(response, is, factory, message)); + } + + private Callable<?> process(Response response, InputStream is, ClientProviderFactory factory, Message message) { + return () -> { + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + String line = null; + InboundSseEventImpl.Builder builder = null; + + while ((line = reader.readLine()) != null && !Thread.interrupted() && !closed) { + if (!StringUtils.isEmpty(line) && line.startsWith(EVENT)) { + if (builder == null) { + builder = new InboundSseEventImpl.Builder(line.substring(EVENT.length())); + } else { + final InboundSseEvent event = builder.build(factory, message); + builder = new InboundSseEventImpl.Builder(line.substring(EVENT.length())); + + if (listener != null) { + listener.onNext(event); + } + } + } else if (builder != null) { + if (line.startsWith(ID)) { + builder.id(line.substring(ID.length())); + } else if (line.startsWith(COMMENT)) { + builder.id(line.substring(COMMENT.length())); + } else if (line.startsWith(RETRY)) { + builder.reconnectDelay(line.substring(RETRY.length())); + } else if (line.startsWith(DATA)) { + builder.data(line.substring(DATA.length())); + } + } + } + + if (listener != null) { + if (builder != null) { + listener.onNext(builder.build(factory, message)); + } + + // complete the stream + listener.onComplete(); + } + } catch (final Exception ex) { + if (listener != null) { + listener.onError(ex); + } + } + + if (response != null) { + response.close(); + } + + closed = true; + return null; + }; + } + + boolean close(long timeout, TimeUnit unit) { + if (closed) { + return true; + } + + try { + closed = true; + executor.shutdown(); + return executor.awaitTermination(timeout, unit); + } catch (final InterruptedException ex) { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceBuilderImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceBuilderImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceBuilderImpl.java new file mode 100644 index 0000000..b3fa0dc --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceBuilderImpl.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.client; + +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.sse.SseEventSource; +import javax.ws.rs.sse.SseEventSource.Builder; + +public class SseEventSourceBuilderImpl extends SseEventSource.Builder { + private static final long DEFAULT_RECONNECT_DELAY_IN_MS = 500; + + private long delay = DEFAULT_RECONNECT_DELAY_IN_MS; + private TimeUnit unit = TimeUnit.MILLISECONDS; + private WebTarget target; + + @Override + public SseEventSource build() { + return new SseEventSourceImpl(target, delay, unit); + } + + @Override + public Builder reconnectingEvery(long delay, TimeUnit unit) { + this.delay = delay; + this.unit = unit; + return this; + } + + @Override + protected Builder target(WebTarget target) { + this.target = target; + return this; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java new file mode 100644 index 0000000..e9e5c1d --- /dev/null +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.sse.client; + +import java.util.Collection; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.logging.Logger; + +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.sse.InboundSseEvent; +import javax.ws.rs.sse.SseEventSource; + +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.jaxrs.client.WebClient; + +/** + * SSE Event Source implementation + */ +public class SseEventSourceImpl implements SseEventSource { + private static final Logger LOG = LogUtils.getL7dLogger(SseEventSourceImpl.class); + + private final WebTarget target; + private final long delay; + private final TimeUnit unit; + private final Collection<InboundSseEventListener> listeners = new CopyOnWriteArrayList<>(); + + // It may happen that open() and close() could be called on separate threads + private volatile InboundSseEventProcessor processor; + + private class InboundSseEventListenerImpl implements InboundSseEventListener { + private final Consumer<InboundSseEvent> onEvent; + private final Consumer<Throwable> onError; + private final Runnable onComplete; + + InboundSseEventListenerImpl(Consumer<InboundSseEvent> e) { + this(e, ex -> {}, () -> {}); + } + + InboundSseEventListenerImpl(Consumer<InboundSseEvent> e, Consumer<Throwable> t) { + this(e, t, () -> {}); + } + + InboundSseEventListenerImpl(Consumer<InboundSseEvent> e, Consumer<Throwable> t, Runnable c) { + this.onEvent = e; + this.onError = t; + this.onComplete = c; + } + + @Override + public void onNext(InboundSseEvent event) { + onEvent.accept(event); + } + + @Override + public void onError(Throwable ex) { + onError.accept(ex); + } + + @Override + public void onComplete() { + onComplete.run(); + } + } + + private final AtomicReference<SseSourceState> state = + new AtomicReference<>(SseSourceState.CLOSED); + + private enum SseSourceState { + OPENING, + OPENED, + CLOSED + } + + SseEventSourceImpl(WebTarget target, long delay, TimeUnit unit) { + this.target = target; + this.delay = delay; + this.unit = unit; + } + + @Override + public void register(Consumer<InboundSseEvent> onEvent) { + listeners.add(new InboundSseEventListenerImpl(onEvent)); + } + + @Override + public void register(Consumer<InboundSseEvent> onEvent, Consumer<Throwable> onError) { + listeners.add(new InboundSseEventListenerImpl(onEvent, onError)); + } + + @Override + public void register(Consumer<InboundSseEvent> onEvent, Consumer<Throwable> onError, Runnable onComplete) { + listeners.add(new InboundSseEventListenerImpl(onEvent, onError, onComplete)); + } + + @Override + public void open() { + if (!state.compareAndSet(SseSourceState.CLOSED, SseSourceState.OPENING)) { + throw new IllegalStateException("The SseEventSource is already in " + state.get() + " state"); + } + + Response response = null; + try { + response = target + .request(MediaType.SERVER_SENT_EVENTS) + .get(); + + final Endpoint endpoint = WebClient.getConfig(target).getEndpoint(); + processor = new InboundSseEventProcessor(endpoint, + new InboundSseEventListener() { + @Override + public void onNext(InboundSseEvent event) { + listeners.forEach(listener -> listener.onNext(event)); + } + + @Override + public void onError(Throwable ex) { + listeners.forEach(listener -> listener.onError(ex)); + if (delay > 0 && unit != null) { + // TODO: Schedule reconnect here + } + } + + @Override + public void onComplete() { + listeners.forEach(InboundSseEventListener::onComplete); + } + } + ); + + processor.run(response); + state.compareAndSet(SseSourceState.OPENING, SseSourceState.OPENED); + + LOG.fine("Opened SSE connection to " + target.getUri()); + } catch (final Exception ex) { + state.compareAndSet(SseSourceState.OPENING, SseSourceState.CLOSED); + LOG.fine("Failed to open SSE connection to " + target.getUri() + ". " + ex.getMessage()); + + if (response != null) { + response.close(); + } + + listeners.forEach(listener -> listener.onError(ex)); + } + } + + @Override + public boolean isOpen() { + return state.get() == SseSourceState.OPENED; + } + + @Override + public boolean close(long timeout, TimeUnit unit) { + if (state.get() == SseSourceState.CLOSED) { + return true; + } + + if (!state.compareAndSet(SseSourceState.OPENED, SseSourceState.CLOSED)) { + throw new IllegalStateException("The SseEventSource is not opened, but in " + state.get() + " state"); + } + + // Should never happen + if (processor == null) { + return true; + } + + return processor.close(timeout, unit); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/rt/rs/sse/src/main/resources/META-INF/services/javax.ws.rs.sse.SseEventSource$Builder ---------------------------------------------------------------------- diff --git a/rt/rs/sse/src/main/resources/META-INF/services/javax.ws.rs.sse.SseEventSource$Builder b/rt/rs/sse/src/main/resources/META-INF/services/javax.ws.rs.sse.SseEventSource$Builder new file mode 100644 index 0000000..ad2d85a --- /dev/null +++ b/rt/rs/sse/src/main/resources/META-INF/services/javax.ws.rs.sse.SseEventSource$Builder @@ -0,0 +1 @@ +org.apache.cxf.jaxrs.sse.client.SseEventSourceBuilderImpl \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseBaseTest.java ---------------------------------------------------------------------- diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseBaseTest.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseBaseTest.java index aa80351..87425b5 100644 --- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseBaseTest.java +++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseBaseTest.java @@ -19,8 +19,11 @@ package org.apache.cxf.systest.jaxrs.sse; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import com.fasterxml.jackson.core.JsonProcessingException; @@ -52,5 +55,25 @@ public abstract class AbstractSseBaseTest extends AbstractBusClientServerTestBas return createWebClient(url, MediaType.SERVER_SENT_EVENTS); } + protected WebTarget createWebTarget(final String url) { + return ClientBuilder + .newClient() + .property("http.receive.timeout", 8000) + .register(JacksonJsonProvider.class) + .target("http://localhost:" + getPort() + url); + } + + protected void awaitEvents(long timeout, final Collection<?> events, int size) throws InterruptedException { + final long sleep = timeout / 10; + + for (int i = 0; i < timeout; i += sleep) { + if (events.size() == size) { + break; + } else { + Thread.sleep(sleep); + } + } + } + protected abstract int getPort(); } http://git-wip-us.apache.org/repos/asf/cxf/blob/feba8045/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java ---------------------------------------------------------------------- diff --git a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java index 0e2e723..98c530b 100644 --- a/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java +++ b/systests/rs-sse/src/test/java/org/apache/cxf/systest/jaxrs/sse/AbstractSseTest.java @@ -18,15 +18,24 @@ */ package org.apache.cxf.systest.jaxrs.sse; +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.Consumer; + +import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import javax.ws.rs.sse.InboundSseEvent; +import javax.ws.rs.sse.SseEventSource; import com.fasterxml.jackson.core.JsonProcessingException; import org.junit.Test; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.hasItems; public abstract class AbstractSseTest extends AbstractSseBaseTest { @Test @@ -51,4 +60,24 @@ public abstract class AbstractSseTest extends AbstractSseBaseTest { r.close(); } + + @Test + public void testBooksStreamIsReturnedFromInboundSseEvents() throws JsonProcessingException, InterruptedException { + final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0"); + final Collection<Book> books = new ArrayList<>(); + + try (final SseEventSource eventSource = SseEventSource.target(target).build()) { + eventSource.register(collect(books), System.out::println); + eventSource.open(); + // Give the SSE stream some time to collect all events + awaitEvents(3000, books, 4); + } + + assertThat(books, hasItems(new Book("New Book #1", 1), new Book("New Book #2", 2), + new Book("New Book #3", 3), new Book("New Book #4", 4))); + } + + private static Consumer<InboundSseEvent> collect(final Collection< Book > books) { + return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE)); + } }
