* map function is implemented as like in LogAnalytics.java
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/5644a21c Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/5644a21c Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/5644a21c Branch: refs/heads/master Commit: 5644a21c5c5678611c7cef4e2c922643088951b8 Parents: 8fbdef7 Author: Furkan KAMACI <[email protected]> Authored: Mon Jun 29 02:10:26 2015 +0300 Committer: Furkan KAMACI <[email protected]> Committed: Mon Jun 29 02:10:26 2015 +0300 ---------------------------------------------------------------------- .../gora/tutorial/log/LogAnalyticsSpark.java | 38 +++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/5644a21c/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java ---------------------------------------------------------------------- diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java index 0194c3b..43acba0 100644 --- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java +++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java @@ -24,12 +24,31 @@ import org.apache.gora.tutorial.log.generated.Pageview; import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import scala.Tuple2; + +import java.util.concurrent.TimeUnit; public class LogAnalyticsSpark { private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>"; + /** The number of milliseconds in a day */ + private static final long DAY_MILIS = 1000 * 60 * 60 * 24; + + //todo _fk consider using Kyro serialization + private static Function<Pageview, Tuple2<String, Long>> s = new Function<Pageview, Tuple2<String, Long>>() { + @Override + public Tuple2<String, Long> call(Pageview pageview) throws Exception { + String key = pageview.getUrl().toString(); + Long value = getDay(pageview.getTimestamp()); + return new Tuple2<>(key, value); + } + }; + public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println(USAGE); @@ -45,14 +64,22 @@ public class LogAnalyticsSpark { System.exit(ret); } + /** + * Rolls up the given timestamp to the day cardinality, so that data can be + * aggregated daily + */ + private static long getDay(long timeStamp) { + return (timeStamp / DAY_MILIS) * DAY_MILIS; + } + public int run(String inStoreClass, String outStoreClass) throws Exception { GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class, Pageview.class); SparkConf sparkConf = new SparkConf().setAppName( "Gora Integration Application").setMaster("local"); - - //todo _fk change architectural desigm + + // todo _fk consider alternative architectural design Class[] c = new Class[1]; c[0] = Pageview.class; sparkConf.registerKryoClasses(c); @@ -64,9 +91,8 @@ public class LogAnalyticsSpark { DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore( inStoreClass, Long.class, Pageview.class, hadoopConf); - JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc, dataStore); - // JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview> - // cachedGoraRdd = goraRDD.cache(); + JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc, + dataStore); long count = goraRDD.count(); System.out.println("Total Count: " + count); @@ -74,6 +100,8 @@ public class LogAnalyticsSpark { String firstOneURL = goraRDD.first()._2().getUrl().toString(); System.out.println(firstOneURL); + JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(s); + return 1; } }
