Repository: cxf
Updated Branches:
  refs/heads/master 925140208 -> 35639ab59


[CXF-6833] Removing Observable providers which do not work asynchronously


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/35639ab5
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/35639ab5
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/35639ab5

Branch: refs/heads/master
Commit: 35639ab59aabf4cace331f49505b699f164df0b1
Parents: 9251402
Author: Sergey Beryozkin <sberyoz...@gmail.com>
Authored: Tue Aug 8 13:21:34 2017 +0100
Committer: Sergey Beryozkin <sberyoz...@gmail.com>
Committed: Tue Aug 8 13:21:34 2017 +0100

----------------------------------------------------------------------
 .../cxf/jaxrs/rx/provider/ObservableReader.java |  61 ----------
 .../cxf/jaxrs/rx/provider/ObservableWriter.java | 119 -------------------
 .../cxf/jaxrs/rx/server/ObservableInvoker.java  |  43 +++++++
 .../jaxrs/rx/provider/ObservableWriterTest.java |  32 -----
 .../jaxrs/reactive/JAXRSObservableTest.java     |  21 ----
 .../jaxrs/reactive/ObservableServer.java        |   4 +-
 .../jaxrs/reactive/ObservableService.java       |   8 --
 7 files changed, 45 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
deleted file mode 100644
index c4423ae..0000000
--- 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx.provider;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-
-import javax.ws.rs.ProcessingException;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyReader;
-import javax.ws.rs.ext.Providers;
-
-import org.apache.cxf.jaxrs.utils.InjectionUtils;
-
-import rx.Observable;
-
-public class ObservableReader<T> implements MessageBodyReader<Observable<T>> {
-
-    @Context
-    private Providers providers;
-
-    @Override
-    public boolean isReadable(Class<?> arg0, Type arg1, Annotation[] arg2, 
MediaType arg3) {
-        return true;
-    }
-
-    @Override
-    public Observable<T> readFrom(Class<Observable<T>> cls, Type t, 
Annotation[] anns, MediaType mt,
-                                  MultivaluedMap<String, String> headers, 
InputStream is)
-                                      throws IOException, 
WebApplicationException {
-        @SuppressWarnings("unchecked")
-        Class<T> actualCls = (Class<T>)InjectionUtils.getActualType(t);
-        final MessageBodyReader<T> mbr =
-            providers.getMessageBodyReader(actualCls, actualCls, anns, mt);
-        if (mbr == null) {
-            throw new ProcessingException("MBR is null");
-        }
-        return Observable.just(mbr.readFrom(actualCls, actualCls, anns, mt, 
headers, is));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
deleted file mode 100644
index 33d3864..0000000
--- 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.rx.provider;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Type;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.ext.MessageBodyWriter;
-import javax.ws.rs.ext.Provider;
-import javax.ws.rs.ext.Providers;
-
-import org.apache.cxf.jaxrs.utils.ExceptionUtils;
-import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
-
-import rx.Observable;
-
-@Provider
-public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> {
-
-    @Context
-    private Providers providers;
-    private boolean writeSingleElementAsList;
-
-    @Override
-    public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, 
Annotation[] arg3, MediaType arg4) {
-        // TODO Auto-generated method stub
-        return -1;
-    }
-
-    @Override
-    public boolean isWriteable(Class<?> arg0, Type arg1, Annotation[] arg2, 
MediaType arg3) {
-        return true;
-    }
-
-    @Override
-    public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] 
anns, MediaType mt,
-                        MultivaluedMap<String, Object> headers, OutputStream 
os)
-                            throws IOException, WebApplicationException {
-        List<T> entities = new LinkedList<T>();
-        obs.subscribe(value -> entities.add(value),
-            throwable -> throwError(throwable));
-        if (!entities.isEmpty()) {
-
-            if (entities.get(0) instanceof List) {
-                List<T> allEntities = new LinkedList<T>();
-                for (T obj : entities) {
-                    @SuppressWarnings("unchecked")
-                    List<T> listT = (List<T>)obj;
-                    allEntities.addAll(listT);
-                }
-                writeToOutputStream(allEntities, anns, mt, headers, os);
-            } else if (entities.size() > 1 || writeSingleElementAsList) {
-                writeToOutputStream(entities, anns, mt, headers, os);
-            } else {
-                writeToOutputStream(entities.get(0), anns, mt, headers, os);
-            }
-        }
-    }
-
-    private void writeToOutputStream(Object value,
-                                     Annotation[] anns,
-                                     MediaType mt,
-                                     MultivaluedMap<String, Object> headers,
-                                     OutputStream os) {
-        Class<?> valueCls = value.getClass();
-        Type valueType = null;
-        if (value instanceof List) {
-            List<?> list = (List<?>)value;
-            valueType = new ParameterizedCollectionType(list.isEmpty() ? 
Object.class : list.get(0).getClass());
-        } else {
-            valueType = valueCls;
-        }
-        @SuppressWarnings("unchecked")
-        MessageBodyWriter<Object> writer =
-            
(MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, 
anns, mt);
-        if (writer == null) {
-            throwError(null);
-        }
-
-        try {
-            writer.writeTo(value, valueCls, valueType, anns, mt, headers, os);
-        } catch (IOException ex) {
-            throwError(ex);
-        }
-    }
-
-    private static void throwError(Throwable cause) {
-        throw ExceptionUtils.toInternalServerErrorException(cause, null);
-    }
-
-    public void setWriteSingleElementAsList(boolean writeSingleElementAsList) {
-        this.writeSingleElementAsList = writeSingleElementAsList;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableInvoker.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableInvoker.java
 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableInvoker.java
new file mode 100644
index 0000000..59d0ca3
--- /dev/null
+++ 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableInvoker.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.server;
+
+import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.message.Message;
+
+import rx.Observable;
+
+public class ObservableInvoker extends JAXRSInvoker {
+    protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object 
result) {
+        if (result instanceof Observable) {
+            final Observable<?> obs = (Observable<?>)result;
+            final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
+            obs.subscribe(v -> asyncResponse.resume(v), t -> 
handleThrowable(asyncResponse, t));
+            return asyncResponse;
+        }
+        return null;
+    }
+
+    private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable 
t) {
+        //TODO: if it is a Cancelation exception => asyncResponse.cancel(); 
+        asyncResponse.resume(t);
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
 
b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
deleted file mode 100644
index 045dcec..0000000
--- 
a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.jaxrs.rx.provider;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ObservableWriterTest extends Assert {
-
-
-    @Test
-    public void testIsWriteable() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
----------------------------------------------------------------------
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
index a0d8af8..39d8fd5 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java
@@ -34,7 +34,6 @@ import org.apache.cxf.jaxrs.client.WebClient;
 import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
 import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
 import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
-import org.apache.cxf.jaxrs.rx.provider.ObservableReader;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 
 import org.junit.BeforeClass;
@@ -67,21 +66,6 @@ public class JAXRSObservableTest extends 
AbstractBusClientServerTestBase {
     }
 
     @Test
-    public void testGetHelloWorldTextObservableSync() throws Exception {
-        String address = "http://localhost:"; + PORT + "/observable/text";
-        WebClient wc = WebClient.create(address, Collections.singletonList(
-            new ObservableReader<Object>()));
-        GenericType<Observable<String>> genericResponseType =
-            new GenericType<Observable<String>>() {
-            };
-        Observable<String> obs = 
wc.accept("text/plain").get(genericResponseType);
-        obs.subscribe(s -> assertResponse(s));
-    }
-
-    private void assertResponse(String s) {
-        assertEquals("Hello, world!", s);
-    }
-    @Test
     public void testGetHelloWorldJson() throws Exception {
         String address = "http://localhost:"; + PORT + "/observable/textJson";
         WebClient wc = WebClient.create(address,
@@ -96,11 +80,6 @@ public class JAXRSObservableTest extends 
AbstractBusClientServerTestBase {
         doTestGetHelloWorldJsonList(address);
     }
     @Test
-    public void testGetHelloWorldJsonImplicitList() throws Exception {
-        String address = "http://localhost:"; + PORT + 
"/observable/textJsonImplicitList";
-        doTestGetHelloWorldJsonList(address);
-    }
-    @Test
     public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
         String address = "http://localhost:"; + PORT + 
"/observable/textJsonImplicitListAsync";
         doTestGetHelloWorldJsonList(address);

http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
----------------------------------------------------------------------
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
index 825dc88..03f89ef 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java
@@ -29,7 +29,7 @@ import org.apache.cxf.ext.logging.LoggingOutInterceptor;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
 import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
-import org.apache.cxf.jaxrs.rx.provider.ObservableWriter;
+import org.apache.cxf.jaxrs.rx.server.ObservableInvoker;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 
@@ -45,7 +45,7 @@ public class ObservableServer extends 
AbstractBusTestServerBase {
         // Make sure default JSONProvider is not loaded
         bus.setProperty("skip.default.json.provider.registration", true);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setProvider(new ObservableWriter<Object>());
+        sf.setInvoker(new ObservableInvoker());
         sf.setProvider(new JacksonJsonProvider());
         StreamingResponseProvider<HelloWorldBean> streamProvider = new 
StreamingResponseProvider<HelloWorldBean>();
         
streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));

http://git-wip-us.apache.org/repos/asf/cxf/blob/35639ab5/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
----------------------------------------------------------------------
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
index 4aebd1b..00783fd 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java
@@ -65,14 +65,6 @@ public class ObservableService {
 
     @GET
     @Produces("application/json")
-    @Path("textJsonImplicitList")
-    public Observable<HelloWorldBean> getJsonImplicitList() {
-        HelloWorldBean bean1 = new HelloWorldBean();
-        HelloWorldBean bean2 = new HelloWorldBean("Ciao");
-        return Observable.just(bean1, bean2);
-    }
-    @GET
-    @Produces("application/json")
     @Path("textJsonImplicitListAsync")
     public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
         final HelloWorldBean bean1 = new HelloWorldBean();

Reply via email to