* GoraSpark.java initialize method renamed to initializeInput. * Architectural change is made.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ef68cead Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ef68cead Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ef68cead Branch: refs/heads/master Commit: ef68cead273324797cf292dbe6da18ee3fd819cb Parents: 9c2d225 Author: Furkan KAMACI <[email protected]> Authored: Sun Jun 28 17:17:52 2015 +0300 Committer: Furkan KAMACI <[email protected]> Committed: Sun Jun 28 17:17:52 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/gora/spark/GoraSpark.java | 50 +++++++++++++++----- .../gora/tutorial/log/LogAnalyticsSpark.java | 44 ++++++++--------- 2 files changed, 60 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/ef68cead/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java ---------------------------------------------------------------------- diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java index 690e32c..4279cfb 100644 --- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java +++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java @@ -24,6 +24,7 @@ import org.apache.gora.mapreduce.GoraMapReduceUtils; import org.apache.gora.persistency.Persistent; import org.apache.gora.store.DataStore; import org.apache.gora.util.IOUtils; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -37,19 +38,44 @@ public class GoraSpark<K, V extends Persistent> { public GoraSpark(Class<K> clazzK, Class<V> clazzV) { this.clazzK = clazzK; - this.clazzV = clazzV; + this.clazzV = clazzV; } - public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext, + public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext, Configuration conf, DataStore<K, V> dataStore) { - GoraMapReduceUtils.setIOSerializations(conf, true); - try { - IOUtils.storeToConf(dataStore.newQuery(), conf, - GoraInputFormat.QUERY_KEY); - } catch (IOException ioex) { - throw new RuntimeException(ioex.getMessage()); - } - return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, - clazzK, clazzV); - } + GoraMapReduceUtils.setIOSerializations(conf, true); + + try { + IOUtils + .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY); + } catch (IOException ioex) { + throw new RuntimeException(ioex.getMessage()); + } + + return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK, + clazzV); + } + + public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext, + DataStore<K, V> dataStore) { + Configuration hadoopConf; + + if ((dataStore instanceof Configurable) && ((Configurable) dataStore).getConf() != null) { + hadoopConf = ((Configurable) dataStore).getConf(); + } else { + hadoopConf = new Configuration(); + } + + GoraMapReduceUtils.setIOSerializations(hadoopConf, true); + + try { + IOUtils.storeToConf(dataStore.newQuery(), hadoopConf, + GoraInputFormat.QUERY_KEY); + } catch (IOException ioex) { + throw new RuntimeException(ioex.getMessage()); + } + + return sparkContext.newAPIHadoopRDD(hadoopConf, GoraInputFormat.class, + clazzK, clazzV); + } } http://git-wip-us.apache.org/repos/asf/gora/blob/ef68cead/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java ---------------------------------------------------------------------- diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java index 214b130..9b28fd7 100644 --- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java +++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java @@ -21,44 +21,44 @@ import org.apache.gora.spark.GoraSpark; import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreFactory; import org.apache.gora.tutorial.log.generated.Pageview; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -public class LogAnalyticsSpark extends Configured implements Tool { +public class LogAnalyticsSpark { private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>"; - private static LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark(); public static void main(String[] args) throws Exception { - if (args.length < 2) { - System.err.println(USAGE); - System.exit(1); + if (args.length < 2) { + System.err.println(USAGE); + System.exit(1); } - // run as any other MR job - int ret = ToolRunner.run(logAnalyticsSpark, args); - System.exit(ret); + + String inStoreClass = args[0]; + String outStoreClass = args[1]; + + LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark(); + int ret = logAnalyticsSpark.run(inStoreClass, outStoreClass); + + System.exit(ret); } - @Override - public int run(String[] args) throws Exception { - GoraSpark<Long, Pageview> goraSpark = new GoraSpark<Long, Pageview>( - Long.class, Pageview.class); + public int run(String inStoreClass, String outStoreClass) throws Exception { + GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class, + Pageview.class); - SparkConf conf = new SparkConf().setAppName( + SparkConf sparkConf = new SparkConf().setAppName( "Gora Integration Application").setMaster("local"); - JavaSparkContext sc = new JavaSparkContext(conf); + JavaSparkContext sc = new JavaSparkContext(sparkConf); + + Configuration hadoopConf = new Configuration(); - String dataStoreClass = args[0]; DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore( - dataStoreClass, Long.class, Pageview.class, - logAnalyticsSpark.getConf()); + inStoreClass, Long.class, Pageview.class, hadoopConf); - JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview> goraRDD = goraSpark - .initialize(sc, logAnalyticsSpark.getConf(), dataStore); + JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc, dataStore); // JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview> // cachedGoraRdd = goraRDD.cache();
