djkevincr commented on a change in pull request #161: GORA-565: Enable Spark in 
Unit Tests
URL: https://github.com/apache/gora/pull/161#discussion_r276698959
 
 

 ##########
 File path: 
gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
 ##########
 @@ -31,122 +34,151 @@
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
-import java.io.IOException;
-import java.util.Map;
+import scala.Tuple2;
 
 /**
  * Classic word count example in Gora with Spark.
  */
 public class SparkWordCount {
-  private static final Logger log = 
LoggerFactory.getLogger(SparkWordCount.class);
-
-  private static final String USAGE = "SparkWordCount <input_data_store> 
<output_data_store>";
-
-  /**
-   * map function used in calculation
-   */
-  private static Function<WebPage, Tuple2<String, Long>> mapFunc =
-    new Function<WebPage, Tuple2<String, Long>>() {
-        @Override
-        public Tuple2<String, Long> call(WebPage webPage)
-                throws Exception {
-          String content = new String(webPage.getContent().array(), 
Charset.defaultCharset());
-          return new Tuple2<>(content, 1L);
-        }
-  };
-
-  /**
-   * reduce function used in calculation
-   */
-  private static Function2<Long, Long, Long> redFunc = new Function2<Long, 
Long, Long>() {
-    @Override
-    public Long call(Long aLong, Long aLong2) throws Exception {
-      return aLong + aLong2;
-    }
-  };
-
-  public int wordCount(DataStore<String,WebPage> inStore,
-    DataStore<String, TokenDatum> outStore) throws IOException {
-
-    //Spark engine initialization
-    GoraSparkEngine<String, WebPage> goraSparkEngine = new 
GoraSparkEngine<>(String.class,
-       WebPage.class);
-
-    SparkConf sparkConf = new SparkConf().setAppName(
-      "Gora Spark Word Count Application").setMaster("local");
-
-    Class[] c = new Class[1];
-    c[0] = inStore.getPersistentClass();
-    sparkConf.registerKryoClasses(c);
-    //
-    JavaSparkContext sc = new JavaSparkContext(sparkConf);
-
-    JavaPairRDD<String, WebPage> goraRDD = goraSparkEngine.initialize(sc, 
inStore);
-
-    long count = goraRDD.count();
-    log.info("Total Web page count: {}", count);
-
-    JavaRDD<Tuple2<String, Long>> mappedGoraRdd = 
goraRDD.values().map(mapFunc);
-
-    JavaPairRDD<String, Long> reducedGoraRdd = 
JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc);
-
-    //Print output for debug purpose
-    log.info("SparkWordCount debug purpose TokenDatum print starts:");
-    Map<String, Long> tokenDatumMap = reducedGoraRdd.collectAsMap();
-    for (String key : tokenDatumMap.keySet()) {
-      log.info(key);
-      log.info(tokenDatumMap.get(key).toString());
-    }
-    log.info("SparkWordCount debug purpose TokenDatum print ends:");
-    //
-
-    //write output to datastore
-    Configuration sparkHadoopConf = 
goraSparkEngine.generateOutputConf(outStore);
-    reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
-    //
-
-    return 1;
-  }
-
-  public int run(String[] args) throws Exception {
-
-    DataStore<String,WebPage> inStore;
-    DataStore<String, TokenDatum> outStore;
-    Configuration hadoopConf = new Configuration();
-    if(args.length > 0) {
-      String dataStoreClass = args[0];
-      inStore = DataStoreFactory.getDataStore(dataStoreClass, String.class, 
WebPage.class, hadoopConf);
-      if(args.length > 1) {
-        dataStoreClass = args[1];
-      }
-      outStore = DataStoreFactory.getDataStore(dataStoreClass,
-        String.class, TokenDatum.class, hadoopConf);
-      } else {
-        inStore = DataStoreFactory.getDataStore(String.class, WebPage.class, 
hadoopConf);
-        outStore = DataStoreFactory.getDataStore(String.class, 
TokenDatum.class, hadoopConf);
-      }
-
-      return wordCount(inStore, outStore);
-  }
-
-  public static void main(String[] args) throws Exception {
-
-    if (args.length < 2) {
-      log.info(USAGE);
-      System.exit(1);
-    }
-
-    SparkWordCount sparkWordCount = new SparkWordCount();
-
-    try {
-      int ret = sparkWordCount.run(args);
-      System.exit(ret);
-    } catch (Exception ex){
-      log.error("Error occurred!");
-    }
-  }
+       private static final Logger log = 
LoggerFactory.getLogger(SparkWordCount.class);
+
+       private static final String USAGE = "SparkWordCount <input_data_store> 
<output_data_store>";
+
+       /**
+        * This method would flattened WebPage data and return an Iterable list 
of
+        * words. The map Function would use this as an input.
+        */
+       private static Function<WebPage, Iterable<String>> flatMapFun = new 
Function<WebPage, Iterable<String>>() {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Iterable<String> call(WebPage v1) throws Exception {
+                       String content = new String(v1.getContent().array(), 
Charset.defaultCharset());
+                       return Arrays.asList(content.split(" "));
+               }
+       };
+
+       /**
+        * Map function used to map out each word with a count of 1
+        */
+       private static Function<String, Tuple2<String, Long>> mapFunc = new 
Function<String, Tuple2<String, Long>>() {
+               @Override
+               public Tuple2<String, Long> call(String s) throws Exception {
+                       return new Tuple2<>(s, 1L);
+               }
+       };
+
+       /**
+        * reduce function used in calculation
+        */
+       private static Function2<Long, Long, Long> redFunc = new 
Function2<Long, Long, Long>() {
+               @Override
+               public Long call(Long aLong, Long aLong2) throws Exception {
+                       return aLong + aLong2;
+               }
+       };
+
+       /**
+        * Convert the key value pair <String, Long> to <String, TokenDatum> as 
per the specification in the mapping file
+        */
+       private static PairFunction<Tuple2<String, Long>, String, TokenDatum> 
metricFunc = new PairFunction<Tuple2<String, Long>, String, TokenDatum>() {
+               @Override
+               public Tuple2<String, TokenDatum> call(Tuple2<String, Long> 
line) throws Exception {
+                       String word = line._1();
+                       TokenDatum tDatum = new TokenDatum();
+                       tDatum.setCount(line._2.intValue());
+                       return new Tuple2<>(word, tDatum);
+               }
+       };
+
+       public int wordCount(DataStore<String, WebPage> inStore, 
DataStore<String, TokenDatum> outStore)
+                       throws IOException {
+
+               // Spark engine initialization
+               GoraSparkEngine<String, WebPage> goraSparkEngine = new 
GoraSparkEngine<>(String.class, WebPage.class);
+
+               SparkConf sparkConf = new SparkConf().setAppName("Gora Spark 
Word Count Application").setMaster("local");
+
+               Class[] c = new Class[1];
+               c[0] = inStore.getPersistentClass();
+               sparkConf.registerKryoClasses(c);
+               //
+               JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+               JavaPairRDD<String, WebPage> goraRDD = 
goraSparkEngine.initialize(sc, inStore);
+
+               JavaPairRDD<String, String> goraRDDFlatenned = 
goraRDD.flatMapValues(flatMapFun);
+
+               // Map<String, WebPage> debug = goraRDD.collectAsMap();
+               // for(WebPage k: debug.values()){
+               // System.out.println(k);
+               // //System.out.println(debug.get(k).getUrl());
+               // }
+
+               long count = goraRDD.count();
+               log.info("Total Web page count: {}", count);
+
+               JavaRDD<Tuple2<String, Long>> mappedGoraRdd = 
goraRDDFlatenned.values().map(mapFunc);
+               System.out.println(mappedGoraRdd.collect());
+
+               JavaPairRDD<String, TokenDatum> reducedGoraRdd = 
JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc)
+                               .mapToPair(metricFunc);
+
+               // Print output for debug purpose
+               log.info("SparkWordCount debug purpose TokenDatum print 
starts:");
+               Map<String, TokenDatum> tokenDatumMap = 
reducedGoraRdd.collectAsMap();
+               for (String key : tokenDatumMap.keySet()) {
+                       log.info(key);
+                       log.info(tokenDatumMap.get(key).toString());
+               }
+               log.info("SparkWordCount debug purpose TokenDatum print ends:");
+
+               // write output to datastore
+               System.out.println(reducedGoraRdd.collect());
+               Configuration sparkHadoopConf = 
goraSparkEngine.generateOutputConf(outStore);
+               reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
+               System.out.println(outStore.get("d"));
+               return 1;
+       }
+
+       public int run(String[] args) throws Exception {
+
 
 Review comment:
   Please reformat this code for 2 space indentation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to