* GoraSparkEngine.java architecture is improved.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/62be0c31 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/62be0c31 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/62be0c31 Branch: refs/heads/master Commit: 62be0c312927e3ea4962eb966689b49dc1fcebce Parents: 2195570 Author: Furkan KAMACI <[email protected]> Authored: Mon Aug 17 22:25:12 2015 +0300 Committer: Furkan KAMACI <[email protected]> Committed: Mon Aug 17 22:25:12 2015 +0300 ---------------------------------------------------------------------- .../org/apache/gora/spark/GoraSparkEngine.java | 41 ++++++++++++++++++++ .../gora/tutorial/log/LogAnalyticsSpark.java | 19 ++------- 2 files changed, 44 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/62be0c31/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java ---------------------------------------------------------------------- diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java index ced44be..9c64542 100644 --- a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java +++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java @@ -21,11 +21,13 @@ import java.io.IOException; import org.apache.gora.mapreduce.GoraInputFormat; import org.apache.gora.mapreduce.GoraMapReduceUtils; +import org.apache.gora.mapreduce.GoraOutputFormat; import org.apache.gora.persistency.Persistent; import org.apache.gora.store.DataStore; import org.apache.gora.util.IOUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -93,4 +95,43 @@ public class GoraSparkEngine<K, V extends Persistent> { return initialize(sparkContext, hadoopConf, dataStore); } + + /** + * Sets the output parameters for the job + * @param job the job to set the properties for + * @param dataStore the datastore as the output + * @param reuseObjects whether to reuse objects in serialization + */ + public <K, V extends Persistent> Configuration setOutput(Job job, + DataStore<K, V> dataStore, boolean reuseObjects) { + return setOutput(job, dataStore.getClass(), dataStore.getKeyClass(), + dataStore.getPersistentClass(), reuseObjects); + } + + /** + * Sets the output parameters for the job + * + * @param job the job to set the properties for + * @param dataStoreClass the datastore class + * @param keyClass output key class + * @param persistentClass output value class + * @param reuseObjects whether to reuse objects in serialization + */ + @SuppressWarnings("rawtypes") + public <K, V extends Persistent> Configuration setOutput(Job job, + Class<? extends DataStore> dataStoreClass, + Class<K> keyClass, Class<V> persistentClass, + boolean reuseObjects) { + + job.setOutputFormatClass(GoraOutputFormat.class); + job.setOutputKeyClass(keyClass); + job.setOutputValueClass(persistentClass); + + job.getConfiguration().setClass(GoraOutputFormat.DATA_STORE_CLASS, dataStoreClass, + DataStore.class); + job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_KEY_CLASS, keyClass, Object.class); + job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_VALUE_CLASS, + persistentClass, Persistent.class); + return job.getConfiguration(); + } } http://git-wip-us.apache.org/repos/asf/gora/blob/62be0c31/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java ---------------------------------------------------------------------- diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java index 327bf7f..d441de7 100644 --- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java +++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java @@ -63,7 +63,6 @@ public class LogAnalyticsSpark { /** The number of milliseconds in a day */ private static final long DAY_MILIS = 1000 * 60 * 60 * 24; - // todo _fk consider using Kyro serialization /** * map function used in calculation */ @@ -173,9 +172,6 @@ public class LogAnalyticsSpark { long count = goraRDD.count(); System.out.println("Total Log Count: " + count); - String firstOneURL = goraRDD.first()._2().getUrl().toString(); - System.out.println("First entry's first URL:" + firstOneURL); - JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD .values().map(mapFunc); @@ -184,7 +180,7 @@ public class LogAnalyticsSpark { System.out.println("MetricDatum count:" + reducedGoraRdd.count()); - //print screen output + //Print output for debug purpose /* Map<String, MetricDatum> metricDatumMap = reducedGoraRdd.collectAsMap(); for (String key : metricDatumMap.keySet()) { @@ -195,19 +191,10 @@ public class LogAnalyticsSpark { //write output to datastore GoraMapReduceUtils.setIOSerializations(hadoopConf, true); - Job job = Job.getInstance(hadoopConf); - job.setOutputFormatClass(GoraOutputFormat.class); - job.setOutputKeyClass(outStore.getKeyClass()); - job.setOutputValueClass(outStore.getPersistentClass()); - - job.getConfiguration().setClass(GoraOutputFormat.DATA_STORE_CLASS, outStore.getClass(), - DataStore.class); - job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_KEY_CLASS, outStore.getKeyClass(), Object.class); - job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_VALUE_CLASS, - outStore.getPersistentClass(), Persistent.class); - reducedGoraRdd.saveAsNewAPIHadoopDataset(job.getConfiguration()); + Configuration sparkHadoopConf = goraSparkEngine.setOutput(job, outStore, true); + reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf); // inStore.close();
