Author: kamaci
Date: Sat Oct 24 22:35:23 2015
New Revision: 1710396
URL: http://svn.apache.org/viewvc?rev=1710396&view=rev
Log:
GoraSparkEngine example added to tutorial.
Modified:
gora/site/trunk/content/current/gora-core.md
gora/site/trunk/content/current/tutorial.md
Modified: gora/site/trunk/content/current/gora-core.md
URL:
http://svn.apache.org/viewvc/gora/site/trunk/content/current/gora-core.md?rev=1710396&r1=1710395&r2=1710396&view=diff
==============================================================================
--- gora/site/trunk/content/current/gora-core.md (original)
+++ gora/site/trunk/content/current/gora-core.md Sat Oct 24 22:35:23 2015
@@ -124,7 +124,7 @@ In the stores covered within the gora-co
#GoraSparkEngine
##Description
-GoraSparkEngine is Spark backend of Apache Gora. Assume that input and output
data stores are:
+GoraSparkEngine is Spark backend of Gora. Assume that input and output data
stores are:
DataStore<K1, V1> inStore;
DataStore<K2, V2> outStore;
Modified: gora/site/trunk/content/current/tutorial.md
URL:
http://svn.apache.org/viewvc/gora/site/trunk/content/current/tutorial.md?rev=1710396&r1=1710395&r2=1710396&view=diff
==============================================================================
--- gora/site/trunk/content/current/tutorial.md (original)
+++ gora/site/trunk/content/current/tutorial.md Sat Oct 24 22:35:23 2015
@@ -975,6 +975,129 @@ The outputs of the job will be saved in
1236902400000
1 row(s) in 0.0490 seconds
+##Spark Backend
+Log analytics example will be implemented via GoraSparkEngine at this tutorial
to explain Spark backend of Gora.
+Data will be read from Hbase, map/reduce methods will be run and result will
be written into Solr (version: 4.10.3).
+All the process will be done over Spark.
+
+Persist data into Hbase as described at [Log analytics in
MapReduce](/current/tutorial.html#log-analytics-in-mapreduce)
+
+To write result into Solr, create a schemaless core named as Metrics. To do it
easily, you can rename default core of collection1 to Metrics which is at
+`solr-4.10.3/example/example-schemaless/solr` folder and edit
`solr-4.10.3/example/example-schemaless/solr/Metrics/core.properties` as
follows:
+
+ name=Metrics
+
+Then run start command for Solr:
+
+ solr-4.10.3/example$ java -Dsolr.solr.home=example-schemaless/solr/ -jar
start.jar
+
+Read data from Hbase, generate some metrics and write results into Solr with
Spark via Gora. Here is how to initialize in and out data stores:
+
+ public int run(String[] args) throws Exception {
+ DataStore<Long, Pageview> inStore;
+ DataStore<String, MetricDatum> outStore;
+ Configuration hadoopConf = new Configuration();
+ if (args.length > 0) {
+ String dataStoreClass = args[0];
+ inStore = DataStoreFactory.getDataStore(dataStoreClass, Long.class,
Pageview.class, hadoopConf);
+ if (args.length > 1) {
+ dataStoreClass = args[1];
+ }
+ outStore = DataStoreFactory.getDataStore(dataStoreClass, String.class,
MetricDatum.class, hadoopConf);
+ } else {
+ inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class,
hadoopConf);
+ outStore = DataStoreFactory.getDataStore(String.class,
MetricDatum.class, hadoopConf);
+ }
+ ...
+ }
+
+Pass input data storeâs key and value classes and instantiate a
GoraSparkEngine:
+
+ GoraSparkEngine<Long, Pageview> goraSparkEngine = new
GoraSparkEngine<>(Long.class, Pageview.class);
+
+Construct a JavaSparkContext. Register input data storeâs value class as
Kryo class:
+
+ SparkConf sparkConf = new SparkConf().setAppName("Gora Spark Integration
Application").setMaster("local");
+ Class[] c = new Class[1];
+ c[0] = inStore.getPersistentClass();
+ sparkConf.registerKryoClasses(c);
+ JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+You can get JavaPairRDD from input data store:
+
+ JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc,
inStore);
+
+When you get it, you can work on it as like you are writing a code for Spark!
For example:
+
+ long count = goraRDD.count();
+ System.out.println("Total Log Count: " + count);
+
+Here are the functions of map and reduce phases for this example:
+
+ /** The number of milliseconds in a day */
+ private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
+ /**
+ * map function used in calculation
+ */
+ private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>
mapFunc = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>() {
+ @Override
+ public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview) throws
Exception {
+ String url = pageview.getUrl().toString();
+ Long day = getDay(pageview.getTimestamp());
+ Tuple2<String, Long> keyTuple = new Tuple2<>(url, day);
+ return new Tuple2<>(keyTuple, 1L);
+ }
+ };
+
+ /**
+ * reduce function used in calculation
+ */
+ private static Function2<Long, Long, Long> redFunc = new Function2<Long,
Long, Long>() {
+ @Override
+ public Long call(Long aLong, Long aLong2) throws Exception {
+ return aLong + aLong2;
+ }
+ };
+
+ /**
+ * metric function used after map phase
+ */
+ private static PairFunction<Tuple2<Tuple2<String, Long>, Long>, String,
MetricDatum> metricFunc = new PairFunction<Tuple2<Tuple2<String, Long>, Long>,
String, MetricDatum>() {
+ @Override
+ public Tuple2<String, MetricDatum> call(
+ Tuple2<Tuple2<String, Long>, Long> tuple2LongTuple2) throws Exception {
+ String dimension = tuple2LongTuple2._1()._1();
+ long timestamp = tuple2LongTuple2._1()._2();
+ MetricDatum metricDatum = new MetricDatum();
+ metricDatum.setMetricDimension(dimension);
+ metricDatum.setTimestamp(timestamp);
+ String key = metricDatum.getMetricDimension().toString();
+ key += "_" + Long.toString(timestamp);
+ metricDatum.setMetric(tuple2LongTuple2._2());
+ return new Tuple2<>(key, metricDatum);
+ }
+ };
+
+ /**
+ * Rolls up the given timestamp to the day cardinality, so that data can be
+ * aggregated daily
+ */
+ private static long getDay(long timeStamp) {
+ return (timeStamp / DAY_MILIS) * DAY_MILIS;
+ }
+
+Here is how to run map and reduce functions at existing JavaPairRDD:
+
+ JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd =
goraRDD.values().map(mapFunc);
+ JavaPairRDD<String, MetricDatum> reducedGoraRdd =
JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc).mapToPair(metricFunc);
+
+When you want to persist result into output data store, (in our example it is
Solr), you should do it as follows:
+
+ Configuration sparkHadoopConf =
goraSparkEngine.generateOutputConf(outStore);
+ reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
+
+Thatâs all! You can check Solr to verify the results.
+
##More Examples
Other than this tutorial, there are several places that you can find
examples of Gora in action.