Repository: gora Updated Branches: refs/heads/master 130257370 -> ea44388f9
* JavaPairRDD support for GoraInputFormat. Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/9c2d225d Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/9c2d225d Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/9c2d225d Branch: refs/heads/master Commit: 9c2d225d04cfa746244373fa661a1aa6f03250bb Parents: bb09d89 Author: Furkan KAMACI <[email protected]> Authored: Sun Jun 28 01:57:51 2015 +0300 Committer: Furkan KAMACI <[email protected]> Committed: Sun Jun 28 01:57:51 2015 +0300 ---------------------------------------------------------------------- gora-core/pom.xml | 7 ++ .../java/org/apache/gora/spark/GoraSpark.java | 55 ++++++++++++++++ .../gora/tutorial/log/LogAnalyticsSpark.java | 69 ++++++++++++++++++++ pom.xml | 4 ++ 4 files changed, 135 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/gora-core/pom.xml ---------------------------------------------------------------------- diff --git a/gora-core/pom.xml b/gora-core/pom.xml index eab5330..5f147fb 100644 --- a/gora-core/pom.xml +++ b/gora-core/pom.xml @@ -141,6 +141,13 @@ <artifactId>guava</artifactId> </dependency> + <!-- Spark dependency --> + <dependency> <!-- Spark dependency --> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <version>1.3.1</version> + </dependency> + <!-- Logging Dependencies --> <dependency> <groupId>log4j</groupId> http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java ---------------------------------------------------------------------- diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java new file mode 100644 index 0000000..690e32c --- /dev/null +++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.spark; + +import java.io.IOException; + +import org.apache.gora.mapreduce.GoraInputFormat; +import org.apache.gora.mapreduce.GoraMapReduceUtils; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; +import org.apache.gora.util.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Base class for Spark integration + */ +public class GoraSpark<K, V extends Persistent> { + Class<K> clazzK; + Class<V> clazzV; + + public GoraSpark(Class<K> clazzK, Class<V> clazzV) { + this.clazzK = clazzK; + this.clazzV = clazzV; + } + + public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext, + Configuration conf, DataStore<K, V> dataStore) { + GoraMapReduceUtils.setIOSerializations(conf, true); + try { + IOUtils.storeToConf(dataStore.newQuery(), conf, + GoraInputFormat.QUERY_KEY); + } catch (IOException ioex) { + throw new RuntimeException(ioex.getMessage()); + } + return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, + clazzK, clazzV); + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java ---------------------------------------------------------------------- diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java new file mode 100644 index 0000000..214b130 --- /dev/null +++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.tutorial.log; + +import org.apache.gora.spark.GoraSpark; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.tutorial.log.generated.Pageview; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; + +public class LogAnalyticsSpark extends Configured implements Tool { + + private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>"; + private static LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark(); + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println(USAGE); + System.exit(1); + } + // run as any other MR job + int ret = ToolRunner.run(logAnalyticsSpark, args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + GoraSpark<Long, Pageview> goraSpark = new GoraSpark<Long, Pageview>( + Long.class, Pageview.class); + + SparkConf conf = new SparkConf().setAppName( + "Gora Integration Application").setMaster("local"); + JavaSparkContext sc = new JavaSparkContext(conf); + + String dataStoreClass = args[0]; + DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore( + dataStoreClass, Long.class, Pageview.class, + logAnalyticsSpark.getConf()); + + JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview> goraRDD = goraSpark + .initialize(sc, logAnalyticsSpark.getConf(), dataStore); + // JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview> + // cachedGoraRdd = goraRDD.cache(); + + long count = goraRDD.count(); + System.out.println("Total Count: " + count); + return 1; + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 01a7fee..a6261ca 100644 --- a/pom.xml +++ b/pom.xml @@ -893,6 +893,10 @@ <groupId>ant</groupId> <artifactId>ant</artifactId> </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> </exclusions> </dependency>
