Repository: cxf
Updated Branches:
  refs/heads/master c1e2c973f -> 122d1f64f


Preparing Spark demo for the next enhancement


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/122d1f64
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/122d1f64
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/122d1f64

Branch: refs/heads/master
Commit: 122d1f64f248294a4303632571ec363aea4c2179
Parents: c1e2c97
Author: Sergey Beryozkin <sberyoz...@gmail.com>
Authored: Tue Sep 20 16:05:12 2016 +0100
Committer: Sergey Beryozkin <sberyoz...@gmail.com>
Committed: Tue Sep 20 16:05:12 2016 +0100

----------------------------------------------------------------------
 .../src/main/java/demo/jaxrs/server/Server.java |  1 +
 .../main/java/demo/jaxrs/server/SparkUtils.java | 96 ++++++++++++++++++++
 .../demo/jaxrs/server/StreamingService.java     | 69 ++------------
 3 files changed, 103 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/122d1f64/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
index d2b3be9..473a534 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
@@ -46,4 +46,5 @@ public class Server {
         System.exit(0);
     }
     
+    
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/122d1f64/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java
new file mode 100644
index 0000000..a01db0f
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package demo.jaxrs.server;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import javax.ws.rs.WebApplicationException;
+
+import org.apache.cxf.common.util.Base64Utility;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+
+import scala.Tuple2;
+
+
+public final class SparkUtils {
+    
+    private SparkUtils() {
+    }
+    
+    public static JavaPairDStream<String, Integer> createOutputDStream(
+        JavaDStream<String> receiverStream) {
+        final JavaDStream<String> words = 
+            receiverStream.flatMap(x -> splitInputString(x));
+            
+        final JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> {
+                    return new Tuple2<String, Integer>(s, 1);
+                });
+        return pairs.reduceByKey((i1, i2) -> {
+                    return i1 + i2;
+                });
+    }
+    public static Iterator<String> splitInputString(String x) {
+        List<String> list = new LinkedList<String>();
+        for (String s : Arrays.asList(x.split(" "))) {
+            s = s.trim();
+            if (s.endsWith(":") || s.endsWith(",") || s.endsWith(";") || 
s.endsWith(".")) {
+                s = s.substring(0, s.length() - 1);
+            }
+            if (!s.isEmpty()) {
+                list.add(s);
+            }
+        }
+        return list.iterator();
+    }
+    public static String getRandomId() {
+        byte[] bytes = new byte[10];
+        new Random().nextBytes(bytes);
+        return Base64Utility.encode(bytes);
+    }
+    public static List<String> getStringsFromInputStream(InputStream is) {
+        return getStringsFromReader(new BufferedReader(new 
InputStreamReader(is)));
+    }
+    public static List<String> getStringsFromString(String s) {
+        return getStringsFromReader(new BufferedReader(new StringReader(s)));
+    }
+    public static List<String> getStringsFromReader(BufferedReader reader) {
+        
+        List<String> inputStrings = new LinkedList<String>();
+        String userInput = null;
+        try {
+            while ((userInput = reader.readLine()) != null) {
+                inputStrings.add(userInput);
+            }
+        } catch (IOException ex) {
+            throw new WebApplicationException(ex);
+        }
+        return inputStrings;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/122d1f64/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index a35b68a..ae5b8dd 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -18,19 +18,12 @@
  */
 package demo.jaxrs.server;
 
-import java.io.BufferedReader;
-import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.StringReader;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -45,7 +38,6 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 
-import org.apache.cxf.common.util.Base64Utility;
 import org.apache.cxf.jaxrs.ext.multipart.Attachment;
 import org.apache.cxf.jaxrs.ext.multipart.Multipart;
 import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor;
@@ -60,8 +52,6 @@ import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 
-import scala.Tuple2;
-
 
 @Path("/")
 public class StreamingService {
@@ -101,7 +91,7 @@ public class StreamingService {
         TikaContentExtractor tika = new TikaContentExtractor();
         TikaContent tikaContent = 
tika.extract(att.getObject(InputStream.class),
                                                mediaType);
-        processStream(async, getStringsFromString(tikaContent.getContent()));
+        processStream(async, 
SparkUtils.getStringsFromString(tikaContent.getContent()));
     }
     
     @POST
@@ -109,13 +99,13 @@ public class StreamingService {
     @Consumes("text/plain")
     @Produces("text/plain")
     public void processSimpleStream(@Suspended AsyncResponse async, 
InputStream is) {
-        processStream(async, getStringsFromInputStream(is));
+        processStream(async, SparkUtils.getStringsFromInputStream(is));
     }
 
     private void processStream(AsyncResponse async, List<String> inputStrings) 
{
         try {
             SparkConf sparkConf = new SparkConf().setMaster("local[*]")
-                .setAppName("JAX-RS Spark Connect " + getRandomId());
+                .setAppName("JAX-RS Spark Connect " + 
SparkUtils.getRandomId());
             JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1)); 
             
             SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
@@ -133,7 +123,7 @@ public class StreamingService {
                receiverStream = jssc.receiverStream(new 
StringListReceiver(inputStrings));
             }
             
-            JavaPairDStream<String, Integer> wordCounts = 
createOutputDStream(receiverStream);
+            JavaPairDStream<String, Integer> wordCounts = 
SparkUtils.createOutputDStream(receiverStream);
             wordCounts.foreachRDD(new OutputFunction(streamOut));
             jssc.start();
                                                     
@@ -148,31 +138,7 @@ public class StreamingService {
         }
     }
     
-    private static JavaPairDStream<String, Integer> createOutputDStream(
-        JavaDStream<String> receiverStream) {
-        final JavaDStream<String> words = 
-            receiverStream.flatMap(x -> splitInputString(x));
-            
-        final JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> {
-                    return new Tuple2<String, Integer>(s, 1);
-                });
-        return pairs.reduceByKey((i1, i2) -> {
-                    return i1 + i2;
-                });
-    }
-    private static Iterator<String> splitInputString(String x) {
-        List<String> list = new LinkedList<String>();
-        for (String s : Arrays.asList(x.split(" "))) {
-            s = s.trim();
-            if (s.endsWith(":") || s.endsWith(",") || s.endsWith(";") || 
s.endsWith(".")) {
-                s = s.substring(0, s.length() - 1);
-            }
-            if (!s.isEmpty()) {
-                list.add(s);
-            }
-        }
-        return list.iterator();
-    }
+    
     private static class OutputFunction implements 
VoidFunction<JavaPairRDD<String, Integer>> {
         private static final long serialVersionUID = 1L;
         private SparkStreamingOutput streamOut;
@@ -188,28 +154,5 @@ public class StreamingService {
         }
         
     }
-    private static String getRandomId() {
-        byte[] bytes = new byte[10];
-        new Random().nextBytes(bytes);
-        return Base64Utility.encode(bytes);
-    }
-    private List<String> getStringsFromInputStream(InputStream is) {
-        return getStringsFromReader(new BufferedReader(new 
InputStreamReader(is)));
-    }
-    private List<String> getStringsFromString(String s) {
-        return getStringsFromReader(new BufferedReader(new StringReader(s)));
-    }
-    private List<String> getStringsFromReader(BufferedReader reader) {
-        
-        List<String> inputStrings = new LinkedList<String>();
-        String userInput = null;
-        try {
-            while ((userInput = reader.readLine()) != null) {
-                inputStrings.add(userInput);
-            }
-        } catch (IOException ex) {
-            throw new WebApplicationException(ex);
-        }
-        return inputStrings;
-    }
+    
 }

Reply via email to