Code is organized at LogAnalyticsSpark.java.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/21955700 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/21955700 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/21955700 Branch: refs/heads/master Commit: 219557002f0e33ef6d0e2bb49471d24fb867b0ac Parents: cf6e765 Author: Furkan KAMACI <[email protected]> Authored: Mon Aug 17 21:21:43 2015 +0300 Committer: Furkan KAMACI <[email protected]> Committed: Mon Aug 17 21:21:43 2015 +0300 ---------------------------------------------------------------------- .../gora/tutorial/log/LogAnalyticsSpark.java | 51 +++++++++++++------- 1 file changed, 34 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/21955700/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java ---------------------------------------------------------------------- diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java index 372c124..327bf7f 100644 --- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java +++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java @@ -35,6 +35,8 @@ import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -54,6 +56,8 @@ import scala.Tuple2; */ public class LogAnalyticsSpark { + private static final Logger log = LoggerFactory.getLogger(LogAnalyticsSpark.class); + private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>"; /** The number of milliseconds in a day */ @@ -112,13 +116,15 @@ public class LogAnalyticsSpark { System.exit(1); } - String inStoreClass = args[0]; - String outStoreClass = args[1]; - LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark(); - int ret = logAnalyticsSpark.run(inStoreClass, outStoreClass); - System.exit(ret); + try { + int ret = logAnalyticsSpark.run(args); + System.exit(ret); + } catch (Exception ex){ + log.error("Error occurred!"); + } + } /** @@ -129,27 +135,39 @@ public class LogAnalyticsSpark { return (timeStamp / DAY_MILIS) * DAY_MILIS; } - public int run(String inStoreClass, String outStoreClass) throws Exception { + public int run(String[] args) throws Exception { + + DataStore<Long, Pageview> inStore; + DataStore<String, MetricDatum> outStore; + Configuration hadoopConf = new Configuration(); + + if (args.length > 0) { + String dataStoreClass = args[0]; + inStore = DataStoreFactory.getDataStore( + dataStoreClass, Long.class, Pageview.class, hadoopConf); + if (args.length > 1) { + dataStoreClass = args[1]; + } + outStore = DataStoreFactory.getDataStore( + dataStoreClass, String.class, MetricDatum.class, hadoopConf); + } else { + inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, hadoopConf); + outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, hadoopConf); + } + + //Spark engine initialization GoraSparkEngine<Long, Pageview> goraSparkEngine = new GoraSparkEngine<>(Long.class, Pageview.class); SparkConf sparkConf = new SparkConf().setAppName( "Gora Integration Application").setMaster("local"); - // todo _fk consider alternative architectural design - // todo design inStore and outStore initialization parts as like LogAnalytics.java - // todo consider creating job and manipulating it at input part as like LogAnalytics.java Class[] c = new Class[1]; c[0] = Pageview.class; sparkConf.registerKryoClasses(c); // JavaSparkContext sc = new JavaSparkContext(sparkConf); - Configuration hadoopConf = new Configuration(); - - DataStore<Long, Pageview> inStore = DataStoreFactory.getDataStore( - inStoreClass, Long.class, Pageview.class, hadoopConf); - JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, inStore); long count = goraRDD.count(); @@ -176,9 +194,6 @@ public class LogAnalyticsSpark { // //write output to datastore - DataStore<String, MetricDatum> outStore = DataStoreFactory.getDataStore( - outStoreClass, String.class, MetricDatum.class, hadoopConf); - GoraMapReduceUtils.setIOSerializations(hadoopConf, true); Job job = Job.getInstance(hadoopConf); @@ -198,6 +213,8 @@ public class LogAnalyticsSpark { inStore.close(); outStore.close(); + log.info("Log completed with success"); + return 1; } }
