Spark engine word count tests are implemented. Due to a connection problem, spark engine word count tests are ignored.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/e817a7f0 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/e817a7f0 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/e817a7f0 Branch: refs/heads/master Commit: e817a7f0eeb2efb98937efb92701705ec80891b9 Parents: 9cff335 Author: Furkan KAMACI <[email protected]> Authored: Sat Aug 22 17:31:57 2015 +0300 Committer: Furkan KAMACI <[email protected]> Committed: Sat Aug 22 17:31:57 2015 +0300 ---------------------------------------------------------------------- .../gora/examples/spark/SparkWordCount.java | 150 +++++++++++++++++++ .../gora/mapreduce/MapReduceTestUtils.java | 31 ++++ .../mapreduce/TestHBaseStoreWordCount.java | 8 + .../mapreduce/TestMongoStoreWordCount.java | 9 ++ 4 files changed, 198 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/e817a7f0/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java ---------------------------------------------------------------------- diff --git a/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java new file mode 100644 index 0000000..837da13 --- /dev/null +++ b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.examples.spark; + +import org.apache.gora.examples.generated.TokenDatum; +import org.apache.gora.examples.generated.WebPage; +import org.apache.gora.spark.GoraSparkEngine; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.util.Map; + +/** + * Classic word count example in Gora with Spark. + */ +public class SparkWordCount { + private static final Logger log = LoggerFactory.getLogger(SparkWordCount.class); + + private static final String USAGE = "SparkWordCount <input_data_store> <output_data_store>"; + + /** + * map function used in calculation + */ + private static Function<WebPage, Tuple2<String, Long>> mapFunc = + new Function<WebPage, Tuple2<String, Long>>() { + @Override + public Tuple2<String, Long> call(WebPage webPage) + throws Exception { + String content = new String(webPage.getContent().array()); + return new Tuple2<>(content, 1L); + } + }; + + /** + * reduce function used in calculation + */ + private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() { + @Override + public Long call(Long aLong, Long aLong2) throws Exception { + return aLong + aLong2; + } + }; + + public int wordCount(DataStore<String,WebPage> inStore, + DataStore<String, TokenDatum> outStore) throws IOException { + + //Spark engine initialization + GoraSparkEngine<String, WebPage> goraSparkEngine = new GoraSparkEngine<>(String.class, + WebPage.class); + + SparkConf sparkConf = new SparkConf().setAppName( + "Gora Spark Word Count Application").setMaster("local"); + + Class[] c = new Class[1]; + c[0] = inStore.getPersistentClass(); + sparkConf.registerKryoClasses(c); + // + JavaSparkContext sc = new JavaSparkContext(sparkConf); + + JavaPairRDD<String, WebPage> goraRDD = goraSparkEngine.initialize(sc, inStore); + + long count = goraRDD.count(); + System.out.println("Total Log Count: " + count); + + JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(mapFunc); + + JavaPairRDD<String, Long> reducedGoraRdd = JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc); + + //Print output for debug purpose + System.out.println("SparkWordCount debug purpose TokenDatum print starts:"); + Map<String, Long> tokenDatumMap = reducedGoraRdd.collectAsMap(); + for (String key : tokenDatumMap.keySet()) { + System.out.println(key); + System.out.println(tokenDatumMap.get(key)); + } + System.out.println("SparkWordCount debug purpose TokenDatum print ends:"); + // + + //write output to datastore + Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore); + reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf); + // + + return 1; + } + + public int run(String[] args) throws Exception { + + DataStore<String,WebPage> inStore; + DataStore<String, TokenDatum> outStore; + Configuration hadoopConf = new Configuration(); + if(args.length > 0) { + String dataStoreClass = args[0]; + inStore = DataStoreFactory.getDataStore(dataStoreClass, String.class, WebPage.class, hadoopConf); + if(args.length > 1) { + dataStoreClass = args[1]; + } + outStore = DataStoreFactory.getDataStore(dataStoreClass, + String.class, TokenDatum.class, hadoopConf); + } else { + inStore = DataStoreFactory.getDataStore(String.class, WebPage.class, hadoopConf); + outStore = DataStoreFactory.getDataStore(String.class, TokenDatum.class, hadoopConf); + } + + return wordCount(inStore, outStore); + } + + public static void main(String[] args) throws Exception { + + if (args.length < 2) { + System.err.println(USAGE); + System.exit(1); + } + + SparkWordCount sparkWordCount = new SparkWordCount(); + + try { + int ret = sparkWordCount.run(args); + System.exit(ret); + } catch (Exception ex){ + log.error("Error occurred!"); + } + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/e817a7f0/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java ---------------------------------------------------------------------- diff --git a/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java b/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java index 12ccea4..c04f615 100644 --- a/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java +++ b/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java @@ -28,6 +28,7 @@ import org.apache.gora.examples.generated.TokenDatum; import org.apache.gora.examples.generated.WebPage; import org.apache.gora.examples.mapreduce.QueryCounter; import org.apache.gora.examples.mapreduce.WordCount; +import org.apache.gora.examples.spark.SparkWordCount; import org.apache.gora.query.Query; import org.apache.gora.store.DataStore; import org.apache.gora.store.impl.DataStoreBase; @@ -103,6 +104,36 @@ public class MapReduceTestUtils { assertTokenCount(outStore, entry.getKey(), entry.getValue()); } } + + public static void testSparkWordCount(Configuration conf, DataStore<String,WebPage> inStore, DataStore<String, + TokenDatum> outStore) throws Exception { + //Datastore now has to be a Hadoop based datastore + ((DataStoreBase<String,WebPage>)inStore).setConf(conf); + ((DataStoreBase<String,TokenDatum>)outStore).setConf(conf); + + //create input + WebPageDataCreator.createWebPageData(inStore); + + //run Spark + SparkWordCount wordCount = new SparkWordCount(); + wordCount.wordCount(inStore, outStore); + + //assert results + HashMap<String, Integer> actualCounts = new HashMap<String, Integer>(); + for(String content : WebPageDataCreator.CONTENTS) { + if (content != null) { + for(String token:content.split(" ")) { + Integer count = actualCounts.get(token); + if(count == null) + count = 0; + actualCounts.put(token, ++count); + } + } + } + for(Map.Entry<String, Integer> entry:actualCounts.entrySet()) { + assertTokenCount(outStore, entry.getKey(), entry.getValue()); + } + } private static void assertTokenCount(DataStore<String, TokenDatum> outStore, String token, int count) throws Exception { http://git-wip-us.apache.org/repos/asf/gora/blob/e817a7f0/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java ---------------------------------------------------------------------- diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java b/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java index b42b0c0..cb4ef03 100644 --- a/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java +++ b/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java @@ -27,6 +27,7 @@ import org.apache.gora.store.DataStoreFactory; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; /** @@ -58,4 +59,11 @@ public class TestHBaseStoreWordCount { MapReduceTestUtils.testWordCount(cluster.getConf(), webPageStore, tokenStore); } + //todo fix config + @Ignore + @Test + public void testSparkWordCount() throws Exception { + MapReduceTestUtils.testSparkWordCount(cluster.getConf(), webPageStore, tokenStore); + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/e817a7f0/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java ---------------------------------------------------------------------- diff --git a/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java b/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java index d6a51a4..dd76263 100644 --- a/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java +++ b/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java @@ -24,6 +24,7 @@ import org.apache.gora.mongodb.store.MongoStore; import org.apache.gora.store.DataStoreFactory; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; /** @@ -55,4 +56,12 @@ public class TestMongoStoreWordCount extends GoraMongoMapredTest { webPageStore, tokenStore); } + //todo fix config + @Ignore + @Test + public void testSparkWordCount() throws Exception { + MapReduceTestUtils.testSparkWordCount(testDriver.getConfiguration(), + webPageStore, tokenStore); + } + }
