Organizing codes Documentation
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/80c0c26d Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/80c0c26d Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/80c0c26d Branch: refs/heads/master Commit: 80c0c26d8fb6a9a84ea39f5aa96cc343b4546266 Parents: 81af4d3 Author: Furkan KAMACI <[email protected]> Authored: Mon Jun 29 19:26:28 2015 +0300 Committer: Furkan KAMACI <[email protected]> Committed: Mon Jun 29 19:26:28 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/gora/spark/GoraSpark.java | 81 ----------------- .../org/apache/gora/spark/GoraSparkEngine.java | 96 ++++++++++++++++++++ .../gora/tutorial/log/LogAnalyticsSpark.java | 40 +++++++- 3 files changed, 131 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/80c0c26d/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java ---------------------------------------------------------------------- diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java deleted file mode 100644 index 02b5b39..0000000 --- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gora.spark; - -import java.io.IOException; - -import org.apache.gora.mapreduce.GoraInputFormat; -import org.apache.gora.mapreduce.GoraMapReduceUtils; -import org.apache.gora.persistency.Persistent; -import org.apache.gora.store.DataStore; -import org.apache.gora.util.IOUtils; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; - -/** - * Base class for Spark integration - */ -public class GoraSpark<K, V extends Persistent> { - Class<K> clazzK; - Class<V> clazzV; - - public GoraSpark(Class<K> clazzK, Class<V> clazzV) { - this.clazzK = clazzK; - this.clazzV = clazzV; - } - - public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext, - Configuration conf, DataStore<K, V> dataStore) { - GoraMapReduceUtils.setIOSerializations(conf, true); - - try { - IOUtils - .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY); - } catch (IOException ioex) { - throw new RuntimeException(ioex.getMessage()); - } - - return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK, - clazzV); - } - - public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext, - DataStore<K, V> dataStore) { - Configuration hadoopConf; - - if ((dataStore instanceof Configurable) && ((Configurable) dataStore).getConf() != null) { - hadoopConf = ((Configurable) dataStore).getConf(); - } else { - hadoopConf = new Configuration(); - } - - GoraMapReduceUtils.setIOSerializations(hadoopConf, true); - - try { - IOUtils.storeToConf(dataStore.newQuery(), hadoopConf, - GoraInputFormat.QUERY_KEY); - } catch (IOException ioex) { - throw new RuntimeException(ioex.getMessage()); - } - - return sparkContext.newAPIHadoopRDD(hadoopConf, GoraInputFormat.class, - clazzK, clazzV); - } -} http://git-wip-us.apache.org/repos/asf/gora/blob/80c0c26d/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java ---------------------------------------------------------------------- diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java new file mode 100644 index 0000000..ced44be --- /dev/null +++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.spark; + +import java.io.IOException; + +import org.apache.gora.mapreduce.GoraInputFormat; +import org.apache.gora.mapreduce.GoraMapReduceUtils; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; +import org.apache.gora.util.IOUtils; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Base class for Gora - Spark integration. + */ +public class GoraSparkEngine<K, V extends Persistent> { + Class<K> clazzK; + Class<V> clazzV; + + public GoraSparkEngine(Class<K> clazzK, Class<V> clazzV) { + this.clazzK = clazzK; + this.clazzV = clazzV; + } + + /** + * Initializes a {@link JavaPairRDD} from given Spark context, Hadoop + * configuration and data store. + * + * @param sparkContext + * Spark context + * @param conf + * Hadoop configuration + * @param dataStore + * Data store + * @return initialized rdd + */ + public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext, + Configuration conf, DataStore<K, V> dataStore) { + GoraMapReduceUtils.setIOSerializations(conf, true); + + try { + IOUtils + .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY); + } catch (IOException ioex) { + throw new RuntimeException(ioex.getMessage()); + } + + return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK, + clazzV); + } + + /** + * Initializes a {@link JavaPairRDD} from given Spark context and data store. + * If given data store is {@link Configurable} and has not a configuration + * than a Hadoop configuration is created otherwise existed configuration is + * used. + * + * @param sparkContext + * Spark context + * @param dataStore + * Data store + * @return initialized rdd + */ + public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext, + DataStore<K, V> dataStore) { + Configuration hadoopConf; + + if ((dataStore instanceof Configurable) + && ((Configurable) dataStore).getConf() != null) { + hadoopConf = ((Configurable) dataStore).getConf(); + } else { + hadoopConf = new Configuration(); + } + + return initialize(sparkContext, hadoopConf, dataStore); + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/80c0c26d/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java ---------------------------------------------------------------------- diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java index d69649e..abced3f 100644 --- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java +++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java @@ -17,7 +17,9 @@ */ package org.apache.gora.tutorial.log; -import org.apache.gora.spark.GoraSpark; +import java.util.Map; + +import org.apache.gora.spark.GoraSparkEngine; import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreFactory; import org.apache.gora.tutorial.log.generated.MetricDatum; @@ -33,6 +35,21 @@ import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; +/** + * LogAnalyticsSpark is the tutorial class to illustrate Gora Spark API. The + * Spark job reads the web access data stored earlier by the {@link LogManager}, + * and calculates the aggregate daily pageviews. The output of the job is stored + * in a Gora compatible data store. + * + * This class illustrates the same functionality with {@link LogAnalytics} via + * Spark. + * + * <p> + * See the tutorial.html file in docs or go to the <a + * href="http://incubator.apache.org/gora/docs/current/tutorial.html"> web + * site</a>for more information. + * </p> + */ public class LogAnalyticsSpark { private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>"; @@ -41,6 +58,9 @@ public class LogAnalyticsSpark { private static final long DAY_MILIS = 1000 * 60 * 60 * 24; // todo _fk consider using Kyro serialization + /** + * map function used in calculation + */ private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> mapFunc = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>() { @Override public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview) @@ -53,6 +73,9 @@ public class LogAnalyticsSpark { } }; + /** + * reduce function used in calculation + */ private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() { @Override public Long call(Long aLong, Long aLong2) throws Exception { @@ -60,6 +83,9 @@ public class LogAnalyticsSpark { } }; + /** + * metric function used after map phase + */ private static PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum> metricFunc = new PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum>() { @Override public Tuple2<String, MetricDatum> call( @@ -102,7 +128,7 @@ public class LogAnalyticsSpark { } public int run(String inStoreClass, String outStoreClass) throws Exception { - GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class, + GoraSparkEngine<Long, Pageview> goraSparkEngine = new GoraSparkEngine<>(Long.class, Pageview.class); SparkConf sparkConf = new SparkConf().setAppName( @@ -120,11 +146,10 @@ public class LogAnalyticsSpark { DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore( inStoreClass, Long.class, Pageview.class, hadoopConf); - JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initialize(sc, - dataStore); + JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, dataStore); long count = goraRDD.count(); - System.out.println("Total Count: " + count); + System.out.println("Total Log Count: " + count); String firstOneURL = goraRDD.first()._2().getUrl().toString(); System.out.println(firstOneURL); @@ -137,6 +162,11 @@ public class LogAnalyticsSpark { System.out.println("MetricDatum count:" + reducedGoraRdd.count()); + Map<String, MetricDatum> metricDatumMap = reducedGoraRdd.collectAsMap(); + for (String key : metricDatumMap.keySet()) { + System.out.println(key); + } + return 1; } }
