Repository: cxf
Updated Branches:
  refs/heads/master 584df3ae1 -> 4bf780105


Experimenting with a Spark socket receiver, finalizing the demo work for now


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/4bf78010
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/4bf78010
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/4bf78010

Branch: refs/heads/master
Commit: 4bf780105271e91451446a2d96032249486b32de
Parents: 584df3a
Author: Sergey Beryozkin <sberyoz...@gmail.com>
Authored: Wed Sep 21 14:45:49 2016 +0100
Committer: Sergey Beryozkin <sberyoz...@gmail.com>
Committed: Wed Sep 21 14:45:49 2016 +0100

----------------------------------------------------------------------
 .../release/samples/jax_rs/spark/README.txt     |  28 +++-
 .../main/release/samples/jax_rs/spark/pom.xml   |   1 -
 .../src/main/java/demo/jaxrs/server/Server.java |  50 ------
 .../main/java/demo/jaxrs/server/SparkJob.java   |  36 -----
 .../jaxrs/server/SparkStreamingListener.java    |  93 -----------
 .../demo/jaxrs/server/SparkStreamingOutput.java |  73 ---------
 .../demo/jaxrs/server/StreamingService.java     | 158 ------------------
 .../demo/jaxrs/server/StringListReceiver.java   |  44 -----
 .../java/demo/jaxrs/server/simple/Server.java   |  50 ++++++
 .../java/demo/jaxrs/server/simple/SparkJob.java |  36 +++++
 .../server/simple/SparkStreamingListener.java   |  93 +++++++++++
 .../server/simple/SparkStreamingOutput.java     |  73 +++++++++
 .../jaxrs/server/simple/StreamingService.java   | 160 +++++++++++++++++++
 .../jaxrs/server/simple/StringListReceiver.java |  44 +++++
 .../java/demo/jaxrs/server/socket/Server.java   | 120 ++++++++++++++
 .../java/demo/jaxrs/server/socket/SparkJob.java |  47 ++++++
 .../server/socket/SparkStreamingOutput.java     |  47 ++++++
 .../jaxrs/server/socket/StreamingService.java   | 105 ++++++++++++
 18 files changed, 799 insertions(+), 459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/README.txt
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/README.txt 
b/distribution/src/main/release/samples/jax_rs/spark/README.txt
index b10a44b..64c377d 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/README.txt
+++ b/distribution/src/main/release/samples/jax_rs/spark/README.txt
@@ -1,11 +1,30 @@
 JAX-RS Spark Streaming Demo 
 ===========================
 
-This demo demonstrates how to connect HTTP and Spark streams with JAX-RS
+This demo demonstrates how to connect HTTP and Spark streams with JAX-RS.
+The demo accept simple Strings or binary attachments which are processed with 
Tika.
+In both cases a list of strings is pushed into a Spark Streaming Pipeline with 
the 
+pipeline output response streamed back to the HTTP client.
 
 Build the demo with "mvn install" and start it with
 
-mvn exec:java
+mvn exec:java -Dexec.mainClass=demo.jaxrs.server.simple.Server
+(uses Spark Receiver initialized with a list of strings)
+
+or 
+
+mvn exec:java -Dexec.mainClass=demo.jaxrs.server.simple.Server 
-DexecArgs=-receiverType=queue
+(Uses Spark Queue Receiver initialized with a parallelized data set)
+
+In both cases a new streaming context is created on every request. 
+
+You cam also try: 
+
+mvn exec:java -Dexec.mainClass=demo.jaxrs.server.socket.Server
+
+(Uses a client socket receiver - JAX-RS server will push a list of strings to 
it 
+and will write down the response data it gets back)
+
 
 Next do: 
 
@@ -13,9 +32,10 @@ Next do:
 
 curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello 
Spark" http://localhost:9000/spark/stream
 
-2. PDF processing:
+2. PDF/ODT/ODP processing:
 
-Open multipart.html located in src/main/resources, locate any PDF file 
available on the local disk and upload.
+Open multipart.html located in src/main/resources, locate any PDF or 
OpenOffice text or presentation file available 
+on the local disk and upload.
 
 Note Spark restricts that only a single streaming context can be active in JVM 
at a given moment of time. 
 This is the error which will be logged if you try to access the demo server 
concurrently:

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/pom.xml 
b/distribution/src/main/release/samples/jax_rs/spark/pom.xml
index ca9ad56..e504d67 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/spark/pom.xml
@@ -90,7 +90,6 @@
                 </executions>
                 <configuration>
                     <executable>java</executable>
-                    <mainClass>demo.jaxrs.server.Server</mainClass>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
deleted file mode 100644
index 473a534..0000000
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package demo.jaxrs.server;
-
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-
-
-public class Server {
-
-    protected Server(String args[]) throws Exception {
-        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-        sf.setResourceClasses(StreamingService.class);
-        
-        String receiverType = args.length == 1 && 
args[0].equals("-receiverType=queue") ?
-            "queue" : "string";
-        sf.setResourceProvider(StreamingService.class, 
-            new SingletonResourceProvider(new StreamingService(receiverType)));
-        sf.setAddress("http://localhost:9000/spark";);
-
-        sf.create();
-    }
-
-    public static void main(String args[]) throws Exception {
-        new Server(args);
-        System.out.println("Server ready...");
-        Thread.sleep(60 * 60 * 1000);
-        System.out.println("Server exiting");
-        System.exit(0);
-    }
-    
-    
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkJob.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkJob.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkJob.java
deleted file mode 100644
index 010ddf3..0000000
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkJob.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class SparkJob implements Runnable {
-    private AsyncResponse ac;
-    private SparkStreamingListener sparkListener;
-    public SparkJob(AsyncResponse ac, SparkStreamingListener sparkListener) {
-        this.ac = ac;
-        this.sparkListener = sparkListener;
-    }
-    @Override
-    public void run() {
-        sparkListener.waitForBatchStarted();
-        ac.resume(sparkListener.getStreamOut());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
deleted file mode 100644
index 6cb68e8..0000000
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server;
-
-import org.apache.spark.streaming.scheduler.StreamingListener;
-import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
-import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
-import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
-import 
org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
-import 
org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
-
-public class SparkStreamingListener implements StreamingListener {
-    private SparkStreamingOutput streamOutput;
-    private boolean batchStarted;
-    private long batchStartAt;
-    
-    public SparkStreamingListener(SparkStreamingOutput streamOutput) {
-        this.streamOutput = streamOutput;
-    }
-
-    @Override
-    public void onBatchCompleted(StreamingListenerBatchCompleted event) {
-        System.out.println("Batch processing time in millisecs: " + 
(System.currentTimeMillis() - batchStartAt));
-        
-        streamOutput.setSparkBatchCompleted();
-    }
-
-    @Override
-    public synchronized void onBatchStarted(StreamingListenerBatchStarted 
event) {
-        batchStarted = true;
-        batchStartAt = System.currentTimeMillis();
-        notify();
-    }
-
-    @Override
-    public void onBatchSubmitted(StreamingListenerBatchSubmitted event) {
-    }
-
-    @Override
-    public void 
onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
-    }
-
-    @Override
-    public void 
onOutputOperationStarted(StreamingListenerOutputOperationStarted event) {
-    }
-
-    @Override
-    public void onReceiverError(StreamingListenerReceiverError event) {
-    }
-
-    @Override
-    public void onReceiverStarted(StreamingListenerReceiverStarted event) {
-    }
-
-    @Override
-    public void onReceiverStopped(StreamingListenerReceiverStopped arg0) {
-    }
-
-    public SparkStreamingOutput getStreamOut() {
-        return streamOutput;
-    }
-
-    public synchronized void waitForBatchStarted() {
-        while (!batchStarted) {
-            try {
-                this.wait();
-            } catch (InterruptedException ex) {
-                // continue
-            }
-        }
-        
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
deleted file mode 100644
index 300a0f7..0000000
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.StreamingOutput;
-
-import org.apache.cxf.common.util.StringUtils;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-public class SparkStreamingOutput implements StreamingOutput {
-    private BlockingQueue<String> responseQueue = new 
LinkedBlockingQueue<String>();
-    
-    private JavaStreamingContext jssc;
-    private volatile boolean sparkBatchCompleted;
-    private volatile boolean outputWriteDone;
-    private long startAt;
-    public SparkStreamingOutput(JavaStreamingContext jssc) {
-        this.jssc = jssc;
-        this.startAt = System.currentTimeMillis();
-    }
-
-    @Override
-    public void write(final OutputStream output) throws IOException, 
WebApplicationException {
-        while (!sparkBatchCompleted || !outputWriteDone || 
!responseQueue.isEmpty()) {
-            try {
-                String responseEntry = responseQueue.poll(1, 
TimeUnit.MILLISECONDS);
-                if (responseEntry != null) {
-                    outputWriteDone = true;
-                    output.write(StringUtils.toBytesUTF8(responseEntry));
-                    output.flush();
-                }
-            } catch (InterruptedException e) {
-                // continue;
-            }
-        }
-        
-        jssc.stop(false);
-        jssc.close();
-        System.out.println("Total processing time in millisecs: " + 
(System.currentTimeMillis() - startAt));
-    }
-    
-    
-    public void setSparkBatchCompleted() {
-        this.sparkBatchCompleted = true;
-    }
-
-    public void addResponseEntry(String value) {
-        responseQueue.add(value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
deleted file mode 100644
index ae5b8dd..0000000
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server;
-
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.Suspended;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.cxf.jaxrs.ext.multipart.Attachment;
-import org.apache.cxf.jaxrs.ext.multipart.Multipart;
-import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor;
-import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkException;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-
-@Path("/")
-public class StreamingService {
-    private static final Map<String, MediaType> MEDIA_TYPE_TABLE;
-    static {
-        MEDIA_TYPE_TABLE = new HashMap<String, MediaType>();
-        MEDIA_TYPE_TABLE.put("pdf", MediaType.valueOf("application/pdf"));
-        MEDIA_TYPE_TABLE.put("odt", 
MediaType.valueOf("application/vnd.oasis.opendocument.text"));
-        MEDIA_TYPE_TABLE.put("odp", 
MediaType.valueOf("application/vnd.oasis.opendocument.presentation"));
-    }
-    private Executor executor = new ThreadPoolExecutor(5, 5, 0, 
TimeUnit.SECONDS,
-                                                       new 
ArrayBlockingQueue<Runnable>(10));
-    
-    private String receiverType;
-    
-    public StreamingService(String receiverType) {
-        this.receiverType = receiverType;
-    }
-    
-    @POST
-    @Path("/multipart")
-    @Consumes("multipart/form-data")
-    @Produces("text/plain")
-    public void processMultipartStream(@Suspended AsyncResponse async, 
-                                       @Multipart("file") Attachment att) {
-        MediaType mediaType = att.getContentType();
-        if (mediaType == null) {
-            String fileName = att.getContentDisposition().getFilename();
-            if (fileName != null) {
-                int extDot = fileName.lastIndexOf('.');
-                if (extDot > 0) {
-                    mediaType = MEDIA_TYPE_TABLE.get(fileName.substring(extDot 
+ 1));
-                }
-            }
-        }
-        
-        TikaContentExtractor tika = new TikaContentExtractor();
-        TikaContent tikaContent = 
tika.extract(att.getObject(InputStream.class),
-                                               mediaType);
-        processStream(async, 
SparkUtils.getStringsFromString(tikaContent.getContent()));
-    }
-    
-    @POST
-    @Path("/stream")
-    @Consumes("text/plain")
-    @Produces("text/plain")
-    public void processSimpleStream(@Suspended AsyncResponse async, 
InputStream is) {
-        processStream(async, SparkUtils.getStringsFromInputStream(is));
-    }
-
-    private void processStream(AsyncResponse async, List<String> inputStrings) 
{
-        try {
-            SparkConf sparkConf = new SparkConf().setMaster("local[*]")
-                .setAppName("JAX-RS Spark Connect " + 
SparkUtils.getRandomId());
-            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1)); 
-            
-            SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
-            SparkStreamingListener sparkListener =  new 
SparkStreamingListener(streamOut);
-            jssc.addStreamingListener(sparkListener);
-            
-            JavaDStream<String> receiverStream = null;
-            if ("queue".equals(receiverType)) {
-               Queue<JavaRDD<String>> rddQueue = new LinkedList<>();
-               for (int i = 0; i < 30; i++) {
-                   rddQueue.add(jssc.sparkContext().parallelize(inputStrings));
-               }
-               receiverStream = jssc.queueStream(rddQueue);
-            } else {
-               receiverStream = jssc.receiverStream(new 
StringListReceiver(inputStrings));
-            }
-            
-            JavaPairDStream<String, Integer> wordCounts = 
SparkUtils.createOutputDStream(receiverStream);
-            wordCounts.foreachRDD(new OutputFunction(streamOut));
-            jssc.start();
-                                                    
-            executor.execute(new SparkJob(async, sparkListener));
-        } catch (Exception ex) {
-            // the compiler does not allow to catch SparkException directly
-            if (ex instanceof SparkException) {
-                async.cancel(60);
-            } else {
-                async.resume(new WebApplicationException(ex));
-            }
-        }
-    }
-    
-    
-    private static class OutputFunction implements 
VoidFunction<JavaPairRDD<String, Integer>> {
-        private static final long serialVersionUID = 1L;
-        private SparkStreamingOutput streamOut;
-        OutputFunction(SparkStreamingOutput streamOut) {
-            this.streamOut = streamOut;
-        }
-        @Override
-        public void call(JavaPairRDD<String, Integer> rdd) {
-            for (Map.Entry<String, Integer> entry : 
rdd.collectAsMap().entrySet()) {
-                String value = entry.getKey() + " : " + entry.getValue() + 
"\n";
-                streamOut.addResponseEntry(value);
-            }
-        }
-        
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StringListReceiver.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StringListReceiver.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StringListReceiver.java
deleted file mode 100644
index 55b6428..0000000
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StringListReceiver.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server;
-
-import java.util.List;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.receiver.Receiver;
-
-public class StringListReceiver extends Receiver<String> {
-
-    private static final long serialVersionUID = 1L;
-    private List<String> inputStrings;
-    
-    public StringListReceiver(List<String> inputStrings) {
-        super(StorageLevel.MEMORY_ONLY());
-        this.inputStrings = inputStrings;
-    }
-    @Override
-    public void onStart() {
-        super.store(inputStrings.iterator());
-    }
-    @Override
-    public void onStop() {
-        // complete
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/Server.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/Server.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/Server.java
new file mode 100644
index 0000000..cbe4008
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/Server.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package demo.jaxrs.server.simple;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+
+
+public class Server {
+
+    protected Server(String args[]) throws Exception {
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setResourceClasses(StreamingService.class);
+        
+        String receiverType = args.length == 1 && 
args[0].equals("-receiverType=queue") ?
+            "queue" : "string";
+        sf.setResourceProvider(StreamingService.class, 
+            new SingletonResourceProvider(new StreamingService(receiverType)));
+        sf.setAddress("http://localhost:9000/spark";);
+
+        sf.create();
+    }
+
+    public static void main(String args[]) throws Exception {
+        new Server(args);
+        System.out.println("Server ready...");
+        Thread.sleep(60 * 60 * 1000);
+        System.out.println("Server exiting");
+        System.exit(0);
+    }
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkJob.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkJob.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkJob.java
new file mode 100644
index 0000000..84a00c7
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkJob.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server.simple;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class SparkJob implements Runnable {
+    private AsyncResponse ac;
+    private SparkStreamingListener sparkListener;
+    public SparkJob(AsyncResponse ac, SparkStreamingListener sparkListener) {
+        this.ac = ac;
+        this.sparkListener = sparkListener;
+    }
+    @Override
+    public void run() {
+        sparkListener.waitForBatchStarted();
+        ac.resume(sparkListener.getStreamOut());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingListener.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingListener.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingListener.java
new file mode 100644
index 0000000..216f3f8
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingListener.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server.simple;
+
+import org.apache.spark.streaming.scheduler.StreamingListener;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
+import 
org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
+import 
org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
+
+public class SparkStreamingListener implements StreamingListener {
+    private SparkStreamingOutput streamOutput;
+    private boolean batchStarted;
+    private long batchStartAt;
+    
+    public SparkStreamingListener(SparkStreamingOutput streamOutput) {
+        this.streamOutput = streamOutput;
+    }
+
+    @Override
+    public void onBatchCompleted(StreamingListenerBatchCompleted event) {
+        System.out.println("Batch processing time in millisecs: " + 
(System.currentTimeMillis() - batchStartAt));
+        
+        streamOutput.setSparkBatchCompleted();
+    }
+
+    @Override
+    public synchronized void onBatchStarted(StreamingListenerBatchStarted 
event) {
+        batchStarted = true;
+        batchStartAt = System.currentTimeMillis();
+        notify();
+    }
+
+    @Override
+    public void onBatchSubmitted(StreamingListenerBatchSubmitted event) {
+    }
+
+    @Override
+    public void 
onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
+    }
+
+    @Override
+    public void 
onOutputOperationStarted(StreamingListenerOutputOperationStarted event) {
+    }
+
+    @Override
+    public void onReceiverError(StreamingListenerReceiverError event) {
+    }
+
+    @Override
+    public void onReceiverStarted(StreamingListenerReceiverStarted event) {
+    }
+
+    @Override
+    public void onReceiverStopped(StreamingListenerReceiverStopped arg0) {
+    }
+
+    public SparkStreamingOutput getStreamOut() {
+        return streamOutput;
+    }
+
+    public synchronized void waitForBatchStarted() {
+        while (!batchStarted) {
+            try {
+                this.wait();
+            } catch (InterruptedException ex) {
+                // continue
+            }
+        }
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingOutput.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingOutput.java
new file mode 100644
index 0000000..67e2c84
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingOutput.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server.simple;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+public class SparkStreamingOutput implements StreamingOutput {
+    private BlockingQueue<String> responseQueue = new 
LinkedBlockingQueue<String>();
+    
+    private JavaStreamingContext jssc;
+    private volatile boolean sparkBatchCompleted;
+    private volatile boolean outputWriteDone;
+    private long startAt;
+    public SparkStreamingOutput(JavaStreamingContext jssc) {
+        this.jssc = jssc;
+        this.startAt = System.currentTimeMillis();
+    }
+
+    @Override
+    public void write(final OutputStream output) throws IOException, 
WebApplicationException {
+        while (!sparkBatchCompleted || !outputWriteDone || 
!responseQueue.isEmpty()) {
+            try {
+                String responseEntry = responseQueue.poll(1, 
TimeUnit.MILLISECONDS);
+                if (responseEntry != null) {
+                    outputWriteDone = true;
+                    output.write(StringUtils.toBytesUTF8(responseEntry));
+                    output.flush();
+                }
+            } catch (InterruptedException e) {
+                // continue;
+            }
+        }
+        
+        jssc.stop(false);
+        jssc.close();
+        System.out.println("Total processing time in millisecs: " + 
(System.currentTimeMillis() - startAt));
+    }
+    
+    
+    public void setSparkBatchCompleted() {
+        this.sparkBatchCompleted = true;
+    }
+
+    public void addResponseEntry(String value) {
+        responseQueue.add(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java
new file mode 100644
index 0000000..a561bc0
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server.simple;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.cxf.jaxrs.ext.multipart.Attachment;
+import org.apache.cxf.jaxrs.ext.multipart.Multipart;
+import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor;
+import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+import demo.jaxrs.server.SparkUtils;
+
+
+@Path("/")
+public class StreamingService {
+    private static final Map<String, MediaType> MEDIA_TYPE_TABLE;
+    static {
+        MEDIA_TYPE_TABLE = new HashMap<String, MediaType>();
+        MEDIA_TYPE_TABLE.put("pdf", MediaType.valueOf("application/pdf"));
+        MEDIA_TYPE_TABLE.put("odt", 
MediaType.valueOf("application/vnd.oasis.opendocument.text"));
+        MEDIA_TYPE_TABLE.put("odp", 
MediaType.valueOf("application/vnd.oasis.opendocument.presentation"));
+    }
+    private Executor executor = new ThreadPoolExecutor(5, 5, 0, 
TimeUnit.SECONDS,
+                                                       new 
ArrayBlockingQueue<Runnable>(10));
+    
+    private String receiverType;
+    
+    public StreamingService(String receiverType) {
+        this.receiverType = receiverType;
+    }
+    
+    @POST
+    @Path("/multipart")
+    @Consumes("multipart/form-data")
+    @Produces("text/plain")
+    public void processMultipartStream(@Suspended AsyncResponse async, 
+                                       @Multipart("file") Attachment att) {
+        MediaType mediaType = att.getContentType();
+        if (mediaType == null) {
+            String fileName = att.getContentDisposition().getFilename();
+            if (fileName != null) {
+                int extDot = fileName.lastIndexOf('.');
+                if (extDot > 0) {
+                    mediaType = MEDIA_TYPE_TABLE.get(fileName.substring(extDot 
+ 1));
+                }
+            }
+        }
+        
+        TikaContentExtractor tika = new TikaContentExtractor();
+        TikaContent tikaContent = 
tika.extract(att.getObject(InputStream.class),
+                                               mediaType);
+        processStream(async, 
SparkUtils.getStringsFromString(tikaContent.getContent()));
+    }
+    
+    @POST
+    @Path("/stream")
+    @Consumes("text/plain")
+    @Produces("text/plain")
+    public void processSimpleStream(@Suspended AsyncResponse async, 
InputStream is) {
+        processStream(async, SparkUtils.getStringsFromInputStream(is));
+    }
+
+    private void processStream(AsyncResponse async, List<String> inputStrings) 
{
+        try {
+            SparkConf sparkConf = new SparkConf().setMaster("local[*]")
+                .setAppName("JAX-RS Spark Connect " + 
SparkUtils.getRandomId());
+            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1)); 
+            
+            SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
+            SparkStreamingListener sparkListener =  new 
SparkStreamingListener(streamOut);
+            jssc.addStreamingListener(sparkListener);
+            
+            JavaDStream<String> receiverStream = null;
+            if ("queue".equals(receiverType)) {
+               Queue<JavaRDD<String>> rddQueue = new LinkedList<>();
+               for (int i = 0; i < 30; i++) {
+                   rddQueue.add(jssc.sparkContext().parallelize(inputStrings));
+               }
+               receiverStream = jssc.queueStream(rddQueue);
+            } else {
+               receiverStream = jssc.receiverStream(new 
StringListReceiver(inputStrings));
+            }
+            
+            JavaPairDStream<String, Integer> wordCounts = 
SparkUtils.createOutputDStream(receiverStream);
+            wordCounts.foreachRDD(new OutputFunction(streamOut));
+            jssc.start();
+                                                    
+            executor.execute(new SparkJob(async, sparkListener));
+        } catch (Exception ex) {
+            // the compiler does not allow to catch SparkException directly
+            if (ex instanceof SparkException) {
+                async.cancel(60);
+            } else {
+                async.resume(new WebApplicationException(ex));
+            }
+        }
+    }
+    
+    
+    private static class OutputFunction implements 
VoidFunction<JavaPairRDD<String, Integer>> {
+        private static final long serialVersionUID = 1L;
+        private SparkStreamingOutput streamOut;
+        OutputFunction(SparkStreamingOutput streamOut) {
+            this.streamOut = streamOut;
+        }
+        @Override
+        public void call(JavaPairRDD<String, Integer> rdd) {
+            for (Map.Entry<String, Integer> entry : 
rdd.collectAsMap().entrySet()) {
+                String value = entry.getKey() + " : " + entry.getValue() + 
"\n";
+                streamOut.addResponseEntry(value);
+            }
+        }
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StringListReceiver.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StringListReceiver.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StringListReceiver.java
new file mode 100644
index 0000000..f5cac28
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StringListReceiver.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server.simple;
+
+import java.util.List;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+
+public class StringListReceiver extends Receiver<String> {
+
+    private static final long serialVersionUID = 1L;
+    private List<String> inputStrings;
+    
+    public StringListReceiver(List<String> inputStrings) {
+        super(StorageLevel.MEMORY_ONLY());
+        this.inputStrings = inputStrings;
+    }
+    @Override
+    public void onStart() {
+        super.store(inputStrings.iterator());
+    }
+    @Override
+    public void onStop() {
+        // complete
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java
new file mode 100644
index 0000000..dc65c9d
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package demo.jaxrs.server.socket;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Map;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.StorageLevels;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+import demo.jaxrs.server.SparkUtils;
+import demo.jaxrs.server.simple.SparkStreamingListener;
+import demo.jaxrs.server.simple.SparkStreamingOutput;
+
+
+public class Server {
+
+    protected Server(String args[]) throws Exception {
+            
+        ServerSocket sparkServerSocket = new ServerSocket(9999);
+        ServerSocket jaxrsResponseServerSocket = new ServerSocket(10000);
+        Socket jaxrsResponseClientSocket = new Socket("localhost", 10000);
+        
+        
+        SparkConf sparkConf = new SparkConf().setMaster("local[*]")
+            .setAppName("JAX-RS Spark Socket Connect");
+        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1)); 
+        
+        SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
+        SparkStreamingListener sparkListener =  new 
SparkStreamingListener(streamOut);
+        jssc.addStreamingListener(sparkListener);
+        
+        JavaDStream<String> receiverStream = jssc.socketTextStream(
+            "localhost", 9999, StorageLevels.MEMORY_ONLY);
+        
+        JavaPairDStream<String, Integer> wordCounts = 
SparkUtils.createOutputDStream(receiverStream);
+        PrintStream sparkResponseOutputStream = new 
PrintStream(jaxrsResponseClientSocket.getOutputStream(), true);
+        wordCounts.foreachRDD(new 
SocketOutputFunction(sparkResponseOutputStream));
+        
+        jssc.start();
+        
+        Socket receiverClientSocket = sparkServerSocket.accept();
+        PrintStream sparkOutputStream = new 
PrintStream(receiverClientSocket.getOutputStream(), true);
+        BufferedReader sparkInputStream = 
+            new BufferedReader(new 
InputStreamReader(jaxrsResponseServerSocket.accept().getInputStream()));
+        
+        
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        
+        sf.setResourceClasses(StreamingService.class);
+        sf.setResourceProvider(StreamingService.class, 
+            new SingletonResourceProvider(new 
StreamingService(sparkInputStream,
+                                                                     
sparkOutputStream)));
+        sf.setAddress("http://localhost:9000/spark";);
+        sf.create();
+        
+        jssc.awaitTermination();
+        sparkServerSocket.close();
+        jaxrsResponseServerSocket.close();
+        jaxrsResponseClientSocket.close();
+        
+    }
+
+    public static void main(String args[]) throws Exception {
+        new Server(args);
+        System.out.println("Server ready...");
+        Thread.sleep(60 * 60 * 1000);
+        System.out.println("Server exiting");
+        System.exit(0);
+    }
+    
+    private static class SocketOutputFunction implements 
VoidFunction<JavaPairRDD<String, Integer>> {
+        private static final long serialVersionUID = 1L;
+        private PrintStream streamOut;
+        SocketOutputFunction(PrintStream streamOut) {
+            this.streamOut = streamOut;
+        }
+        @Override
+        public void call(JavaPairRDD<String, Integer> rdd) {
+            for (Map.Entry<String, Integer> entry : 
rdd.collectAsMap().entrySet()) {
+                String value = entry.getKey() + " : " + entry.getValue();
+                streamOut.println(value);
+            }
+            if (!rdd.collectAsMap().isEmpty()) {
+                streamOut.println("<batchEnd>");
+            }
+        }
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java
new file mode 100644
index 0000000..a24668a
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server.socket;
+
+import java.io.BufferedReader;
+import java.io.PrintStream;
+import java.util.List;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class SparkJob implements Runnable {
+    private AsyncResponse ac;
+    private BufferedReader sparkInputStream;
+    private PrintStream sparkOutputStream;
+    private List<String> inputStrings;
+    public SparkJob(AsyncResponse ac, BufferedReader sparkInputStream,
+                          PrintStream sparkOutputStream, List<String> 
inputStrings) {
+        this.ac = ac;
+        this.inputStrings = inputStrings;
+        this.sparkInputStream = sparkInputStream;
+        this.sparkOutputStream = sparkOutputStream;
+    }
+    @Override
+    public void run() {
+        for (String s : inputStrings) {
+            sparkOutputStream.println(s);
+        }
+        ac.resume(new SparkStreamingOutput(sparkInputStream));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java
new file mode 100644
index 0000000..cce1275
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server.socket;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+
+public class SparkStreamingOutput implements StreamingOutput {
+    private BufferedReader sparkInputStream;
+    public SparkStreamingOutput(BufferedReader sparkInputStream) {
+        this.sparkInputStream = sparkInputStream;
+    }
+
+    @Override
+    public void write(final OutputStream output) throws IOException, 
WebApplicationException {
+        PrintStream printStream = new PrintStream(output, true);
+        String s = null;
+        while ((s = sparkInputStream.readLine()) != null) {
+            if ("<batchEnd>".equals(s)) {
+                break;
+            }
+            printStream.println(s);
+        }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4bf78010/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java
new file mode 100644
index 0000000..7fa69df
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server.socket;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.cxf.jaxrs.ext.multipart.Attachment;
+import org.apache.cxf.jaxrs.ext.multipart.Multipart;
+import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor;
+import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
+
+import demo.jaxrs.server.SparkUtils;
+
+
+@Path("/")
+public class StreamingService {
+    private static final Map<String, MediaType> MEDIA_TYPE_TABLE;
+    static {
+        MEDIA_TYPE_TABLE = new HashMap<String, MediaType>();
+        MEDIA_TYPE_TABLE.put("pdf", MediaType.valueOf("application/pdf"));
+        MEDIA_TYPE_TABLE.put("odt", 
MediaType.valueOf("application/vnd.oasis.opendocument.text"));
+        MEDIA_TYPE_TABLE.put("odp", 
MediaType.valueOf("application/vnd.oasis.opendocument.presentation"));
+    }
+    private Executor executor = new ThreadPoolExecutor(5, 5, 0, 
TimeUnit.SECONDS,
+                                                       new 
ArrayBlockingQueue<Runnable>(10));
+    
+    private PrintStream sparkOutputStream;
+    private BufferedReader sparkInputStream;
+    
+    public StreamingService(BufferedReader sparkInputStream, PrintStream 
sparkOutputStream) {
+        this.sparkInputStream = sparkInputStream;
+        this.sparkOutputStream = sparkOutputStream;
+    }
+    
+    @POST
+    @Path("/multipart")
+    @Consumes("multipart/form-data")
+    @Produces("text/plain")
+    public void processMultipartStream(@Suspended AsyncResponse async, 
+                                       @Multipart("file") Attachment att) {
+        MediaType mediaType = att.getContentType();
+        if (mediaType == null) {
+            String fileName = att.getContentDisposition().getFilename();
+            if (fileName != null) {
+                int extDot = fileName.lastIndexOf('.');
+                if (extDot > 0) {
+                    mediaType = MEDIA_TYPE_TABLE.get(fileName.substring(extDot 
+ 1));
+                }
+            }
+        }
+        
+        TikaContentExtractor tika = new TikaContentExtractor();
+        TikaContent tikaContent = 
tika.extract(att.getObject(InputStream.class),
+                                               mediaType);
+        processStream(async, 
SparkUtils.getStringsFromString(tikaContent.getContent()));
+    }
+    
+    @POST
+    @Path("/stream")
+    @Consumes("text/plain")
+    @Produces("text/plain")
+    public void processSimpleStream(@Suspended AsyncResponse async, 
InputStream is) {
+        processStream(async, SparkUtils.getStringsFromInputStream(is));
+    }
+
+    private void processStream(AsyncResponse async, List<String> inputStrings) 
{
+        executor.execute(
+            new SparkJob(async, sparkInputStream, sparkOutputStream, 
inputStrings));
+    }
+    
+    
+}

Reply via email to