Repository: cxf
Updated Branches:
  refs/heads/master 0af65a4ab -> 629af817f


Optional support for the paralellized collections in the spark demo


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/629af817
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/629af817
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/629af817

Branch: refs/heads/master
Commit: 629af817f4762a12623e76fb7f7baefe98482719
Parents: 0af65a4
Author: Sergey Beryozkin <sberyoz...@gmail.com>
Authored: Tue Sep 20 13:42:15 2016 +0100
Committer: Sergey Beryozkin <sberyoz...@gmail.com>
Committed: Tue Sep 20 13:42:15 2016 +0100

----------------------------------------------------------------------
 .../src/main/java/demo/jaxrs/server/Server.java |  9 ++++--
 .../jaxrs/server/SparkStreamingListener.java    |  4 +++
 .../demo/jaxrs/server/SparkStreamingOutput.java |  3 ++
 .../demo/jaxrs/server/StreamingService.java     | 30 ++++++++++++++------
 4 files changed, 35 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/629af817/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
index 8a1092f..d2b3be9 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
@@ -25,18 +25,21 @@ import 
org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
 
 public class Server {
 
-    protected Server() throws Exception {
+    protected Server(String args[]) throws Exception {
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
         sf.setResourceClasses(StreamingService.class);
+        
+        String receiverType = args.length == 1 && 
args[0].equals("-receiverType=queue") ?
+            "queue" : "string";
         sf.setResourceProvider(StreamingService.class, 
-            new SingletonResourceProvider(new StreamingService()));
+            new SingletonResourceProvider(new StreamingService(receiverType)));
         sf.setAddress("http://localhost:9000/spark";);
 
         sf.create();
     }
 
     public static void main(String args[]) throws Exception {
-        new Server();
+        new Server(args);
         System.out.println("Server ready...");
         Thread.sleep(60 * 60 * 1000);
         System.out.println("Server exiting");

http://git-wip-us.apache.org/repos/asf/cxf/blob/629af817/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
index 8a891c2..6cb68e8 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
@@ -31,6 +31,7 @@ import 
org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
 public class SparkStreamingListener implements StreamingListener {
     private SparkStreamingOutput streamOutput;
     private boolean batchStarted;
+    private long batchStartAt;
     
     public SparkStreamingListener(SparkStreamingOutput streamOutput) {
         this.streamOutput = streamOutput;
@@ -38,12 +39,15 @@ public class SparkStreamingListener implements 
StreamingListener {
 
     @Override
     public void onBatchCompleted(StreamingListenerBatchCompleted event) {
+        System.out.println("Batch processing time in millisecs: " + 
(System.currentTimeMillis() - batchStartAt));
+        
         streamOutput.setSparkBatchCompleted();
     }
 
     @Override
     public synchronized void onBatchStarted(StreamingListenerBatchStarted 
event) {
         batchStarted = true;
+        batchStartAt = System.currentTimeMillis();
         notify();
     }
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/629af817/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index 7324806..300a0f7 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -36,8 +36,10 @@ public class SparkStreamingOutput implements StreamingOutput 
{
     private JavaStreamingContext jssc;
     private volatile boolean sparkBatchCompleted;
     private volatile boolean outputWriteDone;
+    private long startAt;
     public SparkStreamingOutput(JavaStreamingContext jssc) {
         this.jssc = jssc;
+        this.startAt = System.currentTimeMillis();
     }
 
     @Override
@@ -57,6 +59,7 @@ public class SparkStreamingOutput implements StreamingOutput {
         
         jssc.stop(false);
         jssc.close();
+        System.out.println("Total processing time in millisecs: " + 
(System.currentTimeMillis() - startAt));
     }
     
     

http://git-wip-us.apache.org/repos/asf/cxf/blob/629af817/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 2ce1531..a35b68a 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Executor;
@@ -52,13 +53,12 @@ import 
org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.receiver.Receiver;
 
 import scala.Tuple2;
 
@@ -74,7 +74,11 @@ public class StreamingService {
     }
     private Executor executor = new ThreadPoolExecutor(5, 5, 0, 
TimeUnit.SECONDS,
                                                        new 
ArrayBlockingQueue<Runnable>(10));
-    public StreamingService() {
+    
+    private String receiverType;
+    
+    public StreamingService(String receiverType) {
+        this.receiverType = receiverType;
     }
     
     @POST
@@ -97,7 +101,7 @@ public class StreamingService {
         TikaContentExtractor tika = new TikaContentExtractor();
         TikaContent tikaContent = 
tika.extract(att.getObject(InputStream.class),
                                                mediaType);
-        processStream(async, new 
StringListReceiver(getStringsFromString(tikaContent.getContent())));
+        processStream(async, getStringsFromString(tikaContent.getContent()));
     }
     
     @POST
@@ -105,10 +109,10 @@ public class StreamingService {
     @Consumes("text/plain")
     @Produces("text/plain")
     public void processSimpleStream(@Suspended AsyncResponse async, 
InputStream is) {
-        processStream(async, new 
StringListReceiver(getStringsFromInputStream(is)));
+        processStream(async, getStringsFromInputStream(is));
     }
 
-    private void processStream(AsyncResponse async, Receiver<String> receiver) 
{
+    private void processStream(AsyncResponse async, List<String> inputStrings) 
{
         try {
             SparkConf sparkConf = new SparkConf().setMaster("local[*]")
                 .setAppName("JAX-RS Spark Connect " + getRandomId());
@@ -118,7 +122,17 @@ public class StreamingService {
             SparkStreamingListener sparkListener =  new 
SparkStreamingListener(streamOut);
             jssc.addStreamingListener(sparkListener);
             
-            JavaReceiverInputDStream<String> receiverStream = 
jssc.receiverStream(receiver);
+            JavaDStream<String> receiverStream = null;
+            if ("queue".equals(receiverType)) {
+               Queue<JavaRDD<String>> rddQueue = new LinkedList<>();
+               for (int i = 0; i < 30; i++) {
+                   rddQueue.add(jssc.sparkContext().parallelize(inputStrings));
+               }
+               receiverStream = jssc.queueStream(rddQueue);
+            } else {
+               receiverStream = jssc.receiverStream(new 
StringListReceiver(inputStrings));
+            }
+            
             JavaPairDStream<String, Integer> wordCounts = 
createOutputDStream(receiverStream);
             wordCounts.foreachRDD(new OutputFunction(streamOut));
             jssc.start();
@@ -135,7 +149,7 @@ public class StreamingService {
     }
     
     private static JavaPairDStream<String, Integer> createOutputDStream(
-            JavaReceiverInputDStream<String> receiverStream) {
+        JavaDStream<String> receiverStream) {
         final JavaDStream<String> words = 
             receiverStream.flatMap(x -> splitInputString(x));
             

Reply via email to