* GoraSpark.java initializeInput method renamed to initialize * reduce part is added to example.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/81af4d3a Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/81af4d3a Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/81af4d3a Branch: refs/heads/master Commit: 81af4d3afcba4633d0c5d06ead9b4256ea60862f Parents: 445edb1 Author: Furkan KAMACI <[email protected]> Authored: Mon Jun 29 18:33:38 2015 +0300 Committer: Furkan KAMACI <[email protected]> Committed: Mon Jun 29 18:33:38 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/gora/spark/GoraSpark.java | 4 +- .../gora/tutorial/log/LogAnalyticsSpark.java | 49 ++++++++++++++++---- 2 files changed, 43 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/81af4d3a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java ---------------------------------------------------------------------- diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java index 4279cfb..02b5b39 100644 --- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java +++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java @@ -41,7 +41,7 @@ public class GoraSpark<K, V extends Persistent> { this.clazzV = clazzV; } - public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext, + public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext, Configuration conf, DataStore<K, V> dataStore) { GoraMapReduceUtils.setIOSerializations(conf, true); @@ -56,7 +56,7 @@ public class GoraSpark<K, V extends Persistent> { clazzV); } - public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext, + public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext, DataStore<K, V> dataStore) { Configuration hadoopConf; http://git-wip-us.apache.org/repos/asf/gora/blob/81af4d3a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java ---------------------------------------------------------------------- diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java index 0ad3e57..d69649e 100644 --- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java +++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java @@ -20,6 +20,7 @@ package org.apache.gora.tutorial.log; import org.apache.gora.spark.GoraSpark; import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.tutorial.log.generated.MetricDatum; import org.apache.gora.tutorial.log.generated.Pageview; import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; @@ -27,11 +28,11 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; -import java.util.concurrent.TimeUnit; - public class LogAnalyticsSpark { private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>"; @@ -39,18 +40,44 @@ public class LogAnalyticsSpark { /** The number of milliseconds in a day */ private static final long DAY_MILIS = 1000 * 60 * 60 * 24; - //todo _fk consider using Kyro serialization - private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> s = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> () { + // todo _fk consider using Kyro serialization + private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> mapFunc = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>() { @Override - public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview) throws Exception { + public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview) + throws Exception { String url = pageview.getUrl().toString(); Long day = getDay(pageview.getTimestamp()); - Tuple2<String, Long> keyTuple =new Tuple2<>(url, day); + Tuple2<String, Long> keyTuple = new Tuple2<>(url, day); return new Tuple2<>(keyTuple, 1L); } }; + private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() { + @Override + public Long call(Long aLong, Long aLong2) throws Exception { + return aLong + aLong2; + } + }; + + private static PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum> metricFunc = new PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum>() { + @Override + public Tuple2<String, MetricDatum> call( + Tuple2<Tuple2<String, Long>, Long> tuple2LongTuple2) throws Exception { + String dimension = tuple2LongTuple2._1()._1(); + long timestamp = tuple2LongTuple2._1()._2(); + + MetricDatum metricDatum = new MetricDatum(); + metricDatum.setMetricDimension(dimension); + metricDatum.setTimestamp(timestamp); + + String key = metricDatum.getMetricDimension().toString(); + key += "_" + Long.toString(timestamp); + metricDatum.setMetric(tuple2LongTuple2._2()); + return new Tuple2<>(key, metricDatum); + } + }; + public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println(USAGE); @@ -93,7 +120,7 @@ public class LogAnalyticsSpark { DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore( inStoreClass, Long.class, Pageview.class, hadoopConf); - JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc, + JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initialize(sc, dataStore); long count = goraRDD.count(); @@ -102,7 +129,13 @@ public class LogAnalyticsSpark { String firstOneURL = goraRDD.first()._2().getUrl().toString(); System.out.println(firstOneURL); - JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD.values().map(s); + JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD + .values().map(mapFunc); + + JavaPairRDD<String, MetricDatum> reducedGoraRdd = JavaPairRDD + .fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc).mapToPair(metricFunc); + + System.out.println("MetricDatum count:" + reducedGoraRdd.count()); return 1; }
