GEODE-37 change package name from io.pivotal.geode (for ./geode-spark-connector/src/main/java/io/pivotal)to org.apache.geode for(to ./geode-spark-connector/src/main/java/org/apache)
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f7eaa26c Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f7eaa26c Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f7eaa26c Branch: refs/heads/develop Commit: f7eaa26c0009a8a4ec4f0c692de2c07c1ca4eb78 Parents: 03e60a6 Author: Hitesh Khamesra <[email protected]> Authored: Tue Sep 20 15:44:10 2016 -0700 Committer: Hitesh Khamesra <[email protected]> Committed: Tue Sep 20 16:01:02 2016 -0700 ---------------------------------------------------------------------- .../javaapi/GeodeJavaDStreamFunctions.java | 86 ------- .../javaapi/GeodeJavaPairDStreamFunctions.java | 77 ------ .../javaapi/GeodeJavaPairRDDFunctions.java | 238 ------------------- .../javaapi/GeodeJavaRDDFunctions.java | 178 -------------- .../javaapi/GeodeJavaSQLContextFunctions.java | 49 ---- .../javaapi/GeodeJavaSparkContextFunctions.java | 87 ------- .../spark/connector/javaapi/GeodeJavaUtil.java | 122 ---------- .../javaapi/GeodeJavaDStreamFunctions.java | 86 +++++++ .../javaapi/GeodeJavaPairDStreamFunctions.java | 77 ++++++ .../javaapi/GeodeJavaPairRDDFunctions.java | 238 +++++++++++++++++++ .../javaapi/GeodeJavaRDDFunctions.java | 178 ++++++++++++++ .../javaapi/GeodeJavaSQLContextFunctions.java | 49 ++++ .../javaapi/GeodeJavaSparkContextFunctions.java | 87 +++++++ .../spark/connector/javaapi/GeodeJavaUtil.java | 122 ++++++++++ 14 files changed, 837 insertions(+), 837 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java deleted file mode 100644 index e7c7cf9..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.javaapi; - -import io.pivotal.geode.spark.connector.GeodeConnectionConf; -import io.pivotal.geode.spark.connector.streaming.GeodeDStreamFunctions; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.streaming.api.java.JavaDStream; -import java.util.Properties; - -import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; - -/** - * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaDStream} - * to provide Geode Spark Connector functionality. - * - * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link - * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> - */ -public class GeodeJavaDStreamFunctions<T> { - - public final GeodeDStreamFunctions<T> dsf; - - public GeodeJavaDStreamFunctions(JavaDStream<T> ds) { - this.dsf = new GeodeDStreamFunctions<T>(ds.dstream()); - } - - /** - * Save the JavaDStream to Geode key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param func the PairFunction that converts elements of JavaDStream to key/value pairs - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param opConf the optional parameters for this operation - */ - public <K, V> void saveToGeode( - String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) { - dsf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf)); - } - - /** - * Save the JavaDStream to Geode key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param func the PairFunction that converts elements of JavaDStream to key/value pairs - * @param opConf the optional parameters for this operation - */ - public <K, V> void saveToGeode( - String regionPath, PairFunction<T, K, V> func, Properties opConf) { - dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf)); - } - - /** - * Save the JavaDStream to Geode key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param func the PairFunction that converts elements of JavaDStream to key/value pairs - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - */ - public <K, V> void saveToGeode( - String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) { - dsf.saveToGeode(regionPath, func, connConf, emptyStrStrMap()); - } - - /** - * Save the JavaDStream to Geode key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param func the PairFunction that converts elements of JavaDStream to key/value pairs - */ - public <K, V> void saveToGeode( - String regionPath, PairFunction<T, K, V> func) { - dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), emptyStrStrMap()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java deleted file mode 100644 index 2c83255..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.javaapi; - -import io.pivotal.geode.spark.connector.GeodeConnectionConf; -import io.pivotal.geode.spark.connector.streaming.GeodePairDStreamFunctions; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import java.util.Properties; - -import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; - -/** - * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaPairDStream} - * to provide Geode Spark Connector functionality. - * - * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link - * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> - */ -public class GeodeJavaPairDStreamFunctions<K, V> { - - public final GeodePairDStreamFunctions<K, V> dsf; - - public GeodeJavaPairDStreamFunctions(JavaPairDStream<K, V> ds) { - this.dsf = new GeodePairDStreamFunctions<K, V>(ds.dstream()); - } - - /** - * Save the JavaPairDStream to Geode key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param opConf the optional parameters for this operation - */ - public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) { - dsf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf)); - } - - /** - * Save the JavaPairDStream to Geode key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - */ - public void saveToGeode(String regionPath, GeodeConnectionConf connConf) { - dsf.saveToGeode(regionPath, connConf, emptyStrStrMap()); - } - - /** - * Save the JavaPairDStream to Geode key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param opConf the optional parameters for this operation - */ - public void saveToGeode(String regionPath, Properties opConf) { - dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf)); - } - - /** - * Save the JavaPairDStream to Geode key-value store. - * @param regionPath the full path of region that the DStream is stored - */ - public void saveToGeode(String regionPath) { - dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), emptyStrStrMap()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java deleted file mode 100644 index 3278a5b..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.javaapi; - -import io.pivotal.geode.spark.connector.GeodeConnectionConf; -import io.pivotal.geode.spark.connector.GeodePairRDDFunctions; -import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD; -import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.Function; -import scala.Option; -import scala.Tuple2; -import scala.reflect.ClassTag; - -import java.util.Properties; - -import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; - -/** - * A Java API wrapper over {@link org.apache.spark.api.java.JavaPairRDD} to provide Geode Spark - * Connector functionality. - * - * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link - * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> - */ -public class GeodeJavaPairRDDFunctions<K, V> { - - public final GeodePairRDDFunctions<K, V> rddf; - - public GeodeJavaPairRDDFunctions(JavaPairRDD<K, V> rdd) { - this.rddf = new GeodePairRDDFunctions<K, V>(rdd.rdd()); - } - - /** - * Save the pair RDD to Geode key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param opConf the parameters for this operation - */ - public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) { - rddf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf)); - } - - /** - * Save the pair RDD to Geode key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param opConf the parameters for this operation - */ - public void saveToGeode(String regionPath, Properties opConf) { - rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf)); - } - - /** - * Save the pair RDD to Geode key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - */ - public void saveToGeode(String regionPath, GeodeConnectionConf connConf) { - rddf.saveToGeode(regionPath, connConf, emptyStrStrMap()); - } - - /** - * Save the pair RDD to Geode key-value store with the default GeodeConnector. - * @param regionPath the full path of region that the RDD is stored - */ - public void saveToGeode(String regionPath) { - rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), emptyStrStrMap()); - } - - /** - * Return an JavaPairRDD containing all pairs of elements with matching keys in - * this RDD<K, V> and the Geode `Region<K, V2>`. Each pair of elements - * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and - * (k, v2) is in the Geode region. - * - * @param regionPath the region path of the Geode region - * @param <V2> the value type of the Geode region - * @return JavaPairRDD<<K, V>, V2> - */ - public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(String regionPath) { - return joinGeodeRegion(regionPath, rddf.defaultConnectionConf()); - } - - /** - * Return an JavaPairRDD containing all pairs of elements with matching keys in - * this RDD<K, V> and the Geode `Region<K, V2>`. Each pair of elements - * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and - * (k, v2) is in the Geode region. - * - * @param regionPath the region path of the Geode region - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param <V2> the value type of the Geode region - * @return JavaPairRDD<<K, V>, V2> - */ - public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion( - String regionPath, GeodeConnectionConf connConf) { - GeodeJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.joinGeodeRegion(regionPath, connConf); - ClassTag<Tuple2<K, V>> kt = fakeClassTag(); - ClassTag<V2> vt = fakeClassTag(); - return new JavaPairRDD<>(rdd, kt, vt); - } - - /** - * Return an RDD containing all pairs of elements with matching keys in this - * RDD<K, V> and the Geode `Region<K2, V2>`. The join key from RDD - * element is generated by `func(K, V) => K2`, and the key from the Geode - * region is just the key of the key/value pair. - * - * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, - * where (k, v) is in this RDD and (k2, v2) is in the Geode region. - * - * @param regionPath the region path of the Geode region - * @param func the function that generates region key from RDD element (K, V) - * @param <K2> the key type of the Geode region - * @param <V2> the value type of the Geode region - * @return JavaPairRDD<Tuple2<K, V>, V2> - */ - public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion( - String regionPath, Function<Tuple2<K, V>, K2> func) { - return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf()); - } - - /** - * Return an RDD containing all pairs of elements with matching keys in this - * RDD<K, V> and the Geode `Region<K2, V2>`. The join key from RDD - * element is generated by `func(K, V) => K2`, and the key from the Geode - * region is just the key of the key/value pair. - * - * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, - * where (k, v) is in this RDD and (k2, v2) is in the Geode region. - * - * @param regionPath the region path of the Geode region - * @param func the function that generates region key from RDD element (K, V) - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param <K2> the key type of the Geode region - * @param <V2> the value type of the Geode region - * @return JavaPairRDD<Tuple2<K, V>, V2> - */ - public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion( - String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) { - GeodeJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.joinGeodeRegion(regionPath, func, connConf); - ClassTag<Tuple2<K, V>> kt = fakeClassTag(); - ClassTag<V2> vt = fakeClassTag(); - return new JavaPairRDD<>(rdd, kt, vt); - } - - /** - * Perform a left outer join of this RDD<K, V> and the Geode `Region<K, V2>`. - * For each element (k, v) in this RDD, the resulting RDD will either contain - * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair - * ((k, v), None)) if no element in the Geode region have key k. - * - * @param regionPath the region path of the Geode region - * @param <V2> the value type of the Geode region - * @return JavaPairRDD<Tuple2<K, V>, Option<V>> - */ - public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(String regionPath) { - return outerJoinGeodeRegion(regionPath, rddf.defaultConnectionConf()); - } - - /** - * Perform a left outer join of this RDD<K, V> and the Geode `Region<K, V2>`. - * For each element (k, v) in this RDD, the resulting RDD will either contain - * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair - * ((k, v), None)) if no element in the Geode region have key k. - * - * @param regionPath the region path of the Geode region - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param <V2> the value type of the Geode region - * @return JavaPairRDD<Tuple2<K, V>, Option<V>> - */ - public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion( - String regionPath, GeodeConnectionConf connConf) { - GeodeOuterJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, connConf); - ClassTag<Tuple2<K, V>> kt = fakeClassTag(); - ClassTag<Option<V2>> vt = fakeClassTag(); - return new JavaPairRDD<>(rdd, kt, vt); - } - - /** - * Perform a left outer join of this RDD<K, V> and the Geode `Region<K2, V2>`. - * The join key from RDD element is generated by `func(K, V) => K2`, and the - * key from region is just the key of the key/value pair. - * - * For each element (k, v) in `this` RDD, the resulting RDD will either contain - * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair - * ((k, v), None)) if no element in the Geode region have key `func(k, v)`. - * - * @param regionPath the region path of the Geode region - * @param func the function that generates region key from RDD element (K, V) - * @param <K2> the key type of the Geode region - * @param <V2> the value type of the Geode region - * @return JavaPairRDD<Tuple2<K, V>, Option<V>> - */ - public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion( - String regionPath, Function<Tuple2<K, V>, K2> func) { - return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf()); - } - - /** - * Perform a left outer join of this RDD<K, V> and the Geode `Region<K2, V2>`. - * The join key from RDD element is generated by `func(K, V) => K2`, and the - * key from region is just the key of the key/value pair. - * - * For each element (k, v) in `this` RDD, the resulting RDD will either contain - * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair - * ((k, v), None)) if no element in the Geode region have key `func(k, v)`. - * - * @param regionPath the region path of the Geode region - * @param func the function that generates region key from RDD element (K, V) - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param <K2> the key type of the Geode region - * @param <V2> the value type of the Geode region - * @return JavaPairRDD<Tuple2<K, V>, Option<V>> - */ - public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion( - String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) { - GeodeOuterJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf); - ClassTag<Tuple2<K, V>> kt = fakeClassTag(); - ClassTag<Option<V2>> vt = fakeClassTag(); - return new JavaPairRDD<>(rdd, kt, vt); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java deleted file mode 100644 index e4f6f36..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.javaapi; - -import io.pivotal.geode.spark.connector.GeodeConnectionConf; -import io.pivotal.geode.spark.connector.GeodeRDDFunctions; -import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD; -import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; -import scala.Option; -import scala.reflect.ClassTag; - -import java.util.Properties; - -import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; - -/** - * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide Geode Spark - * Connector functionality. - * - * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link - * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> - */ -public class GeodeJavaRDDFunctions<T> { - - public final GeodeRDDFunctions<T> rddf; - - public GeodeJavaRDDFunctions(JavaRDD<T> rdd) { - this.rddf = new GeodeRDDFunctions<T>(rdd.rdd()); - } - - /** - * Save the non-pair RDD to Geode key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param func the PairFunction that converts elements of JavaRDD to key/value pairs - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param opConf the parameters for this operation - */ - public <K, V> void saveToGeode( - String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) { - rddf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf)); - } - - /** - * Save the non-pair RDD to Geode key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param func the PairFunction that converts elements of JavaRDD to key/value pairs - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - */ - public <K, V> void saveToGeode( - String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) { - rddf.saveToGeode(regionPath, func, connConf, emptyStrStrMap()); - } - - /** - * Save the non-pair RDD to Geode key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param func the PairFunction that converts elements of JavaRDD to key/value pairs - * @param opConf the parameters for this operation - */ - public <K, V> void saveToGeode( - String regionPath, PairFunction<T, K, V> func, Properties opConf) { - rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf)); - } - - /** - * Save the non-pair RDD to Geode key-value store with default GeodeConnector. - * @param regionPath the full path of region that the RDD is stored - * @param func the PairFunction that converts elements of JavaRDD to key/value pairs - */ - public <K, V> void saveToGeode(String regionPath, PairFunction<T, K, V> func) { - rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), emptyStrStrMap()); - } - - /** - * Return an RDD containing all pairs of elements with matching keys in this - * RDD<T> and the Geode `Region<K, V>`. The join key from RDD - * element is generated by `func(T) => K`, and the key from the Geode - * region is just the key of the key/value pair. - * - * Each pair of elements of result RDD will be returned as a (t, v2) tuple, - * where t is from this RDD and v is from the Geode region. - * - * @param regionPath the region path of the Geode region - * @param func the function that generates region key from RDD element T - * @param <K> the key type of the Geode region - * @param <V> the value type of the Geode region - * @return JavaPairRDD<T, V> - */ - public <K, V> JavaPairRDD<T, V> joinGeodeRegion(String regionPath, Function<T, K> func) { - return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf()); - } - - /** - * Return an RDD containing all pairs of elements with matching keys in this - * RDD<T> and the Geode `Region<K, V>`. The join key from RDD - * element is generated by `func(T) => K`, and the key from the Geode - * region is just the key of the key/value pair. - * - * Each pair of elements of result RDD will be returned as a (t, v2) tuple, - * where t is from this RDD and v is from the Geode region. - * - * @param regionPath the region path of the Geode region - * @param func the function that generates region key from RDD element T - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param <K> the key type of the Geode region - * @param <V> the value type of the Geode region - * @return JavaPairRDD<T, V> - */ - public <K, V> JavaPairRDD<T, V> joinGeodeRegion( - String regionPath, Function<T, K> func, GeodeConnectionConf connConf) { - GeodeJoinRDD<T, K, V> rdd = rddf.joinGeodeRegion(regionPath, func, connConf); - ClassTag<T> kt = fakeClassTag(); - ClassTag<V> vt = fakeClassTag(); - return new JavaPairRDD<>(rdd, kt, vt); - } - - /** - * Perform a left outer join of this RDD<T> and the Geode `Region<K, V>`. - * The join key from RDD element is generated by `func(T) => K`, and the - * key from region is just the key of the key/value pair. - * - * For each element (t) in this RDD, the resulting RDD will either contain - * all pairs (t, Some(v)) for v in the Geode region, or the pair - * (t, None) if no element in the Geode region have key `func(t)`. - * - * @param regionPath the region path of the Geode region - * @param func the function that generates region key from RDD element T - * @param <K> the key type of the Geode region - * @param <V> the value type of the Geode region - * @return JavaPairRDD<T, Option<V>> - */ - public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(String regionPath, Function<T, K> func) { - return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf()); - } - - /** - * Perform a left outer join of this RDD<T> and the Geode `Region<K, V>`. - * The join key from RDD element is generated by `func(T) => K`, and the - * key from region is just the key of the key/value pair. - * - * For each element (t) in this RDD, the resulting RDD will either contain - * all pairs (t, Some(v)) for v in the Geode region, or the pair - * (t, None) if no element in the Geode region have key `func(t)`. - * - * @param regionPath the region path of the Geode region - * @param func the function that generates region key from RDD element T - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param <K> the key type of the Geode region - * @param <V> the value type of the Geode region - * @return JavaPairRDD<T, Option<V>> - */ - public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion( - String regionPath, Function<T, K> func, GeodeConnectionConf connConf) { - GeodeOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf); - ClassTag<T> kt = fakeClassTag(); - ClassTag<Option<V>> vt = fakeClassTag(); - return new JavaPairRDD<>(rdd, kt, vt); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java deleted file mode 100644 index 3471bf90..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.javaapi; - -import io.pivotal.geode.spark.connector.GeodeConnectionConf; -import io.pivotal.geode.spark.connector.GeodeSQLContextFunctions; -import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.SQLContext; - -/** - * Java API wrapper over {@link org.apache.spark.sql.SQLContext} to provide Geode - * OQL functionality. - * - * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link - * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> - */ -public class GeodeJavaSQLContextFunctions { - - public final GeodeSQLContextFunctions scf; - - public GeodeJavaSQLContextFunctions(SQLContext sqlContext) { - scf = new GeodeSQLContextFunctions(sqlContext); - } - - public <T> DataFrame geodeOQL(String query) { - DataFrame df = scf.geodeOQL(query, scf.defaultConnectionConf()); - return df; - } - - public <T> DataFrame geodeOQL(String query, GeodeConnectionConf connConf) { - DataFrame df = scf.geodeOQL(query, connConf); - return df; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java deleted file mode 100644 index ce6b1ff..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.javaapi; - - -import io.pivotal.geode.spark.connector.GeodeConnectionConf; -import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD; -import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD$; -import org.apache.spark.SparkContext; -import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; - -import scala.reflect.ClassTag; -import java.util.Properties; - -/** - * Java API wrapper over {@link org.apache.spark.SparkContext} to provide Geode - * Connector functionality. - * - * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link - * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> - */ -public class GeodeJavaSparkContextFunctions { - - public final SparkContext sc; - - public GeodeJavaSparkContextFunctions(SparkContext sc) { - this.sc = sc; - } - - /** - * Expose a Geode region as a JavaPairRDD - * @param regionPath the full path of the region - * @param connConf the GeodeConnectionConf that can be used to access the region - * @param opConf the parameters for this operation, such as preferred partitioner. - */ - public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion( - String regionPath, GeodeConnectionConf connConf, Properties opConf) { - ClassTag<K> kt = fakeClassTag(); - ClassTag<V> vt = fakeClassTag(); - GeodeRegionRDD<K, V> rdd = GeodeRegionRDD$.MODULE$.apply( - sc, regionPath, connConf, propertiesToScalaMap(opConf), kt, vt); - return new GeodeJavaRegionRDD<>(rdd); - } - - /** - * Expose a Geode region as a JavaPairRDD with default GeodeConnector and no preferred partitioner. - * @param regionPath the full path of the region - */ - public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath) { - GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf()); - return geodeRegion(regionPath, connConf, new Properties()); - } - - /** - * Expose a Geode region as a JavaPairRDD with no preferred partitioner. - * @param regionPath the full path of the region - * @param connConf the GeodeConnectionConf that can be used to access the region - */ - public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, GeodeConnectionConf connConf) { - return geodeRegion(regionPath, connConf, new Properties()); - } - - /** - * Expose a Geode region as a JavaPairRDD with default GeodeConnector. - * @param regionPath the full path of the region - * @param opConf the parameters for this operation, such as preferred partitioner. - */ - public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, Properties opConf) { - GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf()); - return geodeRegion(regionPath, connConf, opConf); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java deleted file mode 100644 index 41fe7e5..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.javaapi; - -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.SQLContext; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import scala.Tuple2; - -import io.pivotal.geode.spark.connector.package$; - -/** - * The main entry point to Spark Geode Connector Java API. - * - * There are several helpful static factory methods which build useful wrappers - * around Spark Context, Streaming Context and RDD. There are also helper methods - * to convert JavaRDD<Tuple2<K, V>> to JavaPairRDD<K, V>. - */ -public final class GeodeJavaUtil { - - /** constants */ - public static String GeodeLocatorPropKey = package$.MODULE$.GeodeLocatorPropKey(); - // partitioner related keys and values - public static String PreferredPartitionerPropKey = package$.MODULE$.PreferredPartitionerPropKey(); - public static String NumberPartitionsPerServerPropKey = package$.MODULE$.NumberPartitionsPerServerPropKey(); - public static String OnePartitionPartitionerName = package$.MODULE$.OnePartitionPartitionerName(); - public static String ServerSplitsPartitionerName = package$.MODULE$.ServerSplitsPartitionerName(); - public static String RDDSaveBatchSizePropKey = package$.MODULE$.RDDSaveBatchSizePropKey(); - public static int RDDSaveBatchSizeDefault = package$.MODULE$.RDDSaveBatchSizeDefault(); - - /** The private constructor is used prevents user from creating instance of this class. */ - private GeodeJavaUtil() { } - - /** - * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based - * on an existing {@link SparkContext} instance. - */ - public static GeodeJavaSparkContextFunctions javaFunctions(SparkContext sc) { - return new GeodeJavaSparkContextFunctions(sc); - } - - /** - * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based - * on an existing {@link JavaSparkContext} instance. - */ - public static GeodeJavaSparkContextFunctions javaFunctions(JavaSparkContext jsc) { - return new GeodeJavaSparkContextFunctions(JavaSparkContext.toSparkContext(jsc)); - } - - /** - * A static factory method to create a {@link GeodeJavaPairRDDFunctions} based on an - * existing {@link org.apache.spark.api.java.JavaPairRDD} instance. - */ - public static <K, V> GeodeJavaPairRDDFunctions<K, V> javaFunctions(JavaPairRDD<K, V> rdd) { - return new GeodeJavaPairRDDFunctions<K, V>(rdd); - } - - /** - * A static factory method to create a {@link GeodeJavaRDDFunctions} based on an - * existing {@link org.apache.spark.api.java.JavaRDD} instance. - */ - public static <T> GeodeJavaRDDFunctions<T> javaFunctions(JavaRDD<T> rdd) { - return new GeodeJavaRDDFunctions<T>(rdd); - } - - /** - * A static factory method to create a {@link GeodeJavaPairDStreamFunctions} based on an - * existing {@link org.apache.spark.streaming.api.java.JavaPairDStream} instance. - */ - public static <K, V> GeodeJavaPairDStreamFunctions<K, V> javaFunctions(JavaPairDStream<K, V> ds) { - return new GeodeJavaPairDStreamFunctions<>(ds); - } - - /** - * A static factory method to create a {@link GeodeJavaDStreamFunctions} based on an - * existing {@link org.apache.spark.streaming.api.java.JavaDStream} instance. - */ - public static <T> GeodeJavaDStreamFunctions<T> javaFunctions(JavaDStream<T> ds) { - return new GeodeJavaDStreamFunctions<>(ds); - } - - /** Convert an instance of {@link org.apache.spark.api.java.JavaRDD}<<Tuple2<K, V>> - * to a {@link org.apache.spark.api.java.JavaPairRDD}<K, V>. - */ - public static <K, V> JavaPairRDD<K, V> toJavaPairRDD(JavaRDD<Tuple2<K, V>> rdd) { - return JavaAPIHelper.toJavaPairRDD(rdd); - } - - /** Convert an instance of {@link org.apache.spark.streaming.api.java.JavaDStream}<<Tuple2<K, V>> - * to a {@link org.apache.spark.streaming.api.java.JavaPairDStream}<K, V>. - */ - public static <K, V> JavaPairDStream<K, V> toJavaPairDStream(JavaDStream<Tuple2<K, V>> ds) { - return JavaAPIHelper.toJavaPairDStream(ds); - } - - /** - * A static factory method to create a {@link GeodeJavaSQLContextFunctions} based - * on an existing {@link SQLContext} instance. - */ - public static GeodeJavaSQLContextFunctions javaFunctions(SQLContext sqlContext) { - return new GeodeJavaSQLContextFunctions(sqlContext); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java new file mode 100644 index 0000000..e7c7cf9 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi; + +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import io.pivotal.geode.spark.connector.streaming.GeodeDStreamFunctions; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.api.java.JavaDStream; +import java.util.Properties; + +import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; + +/** + * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaDStream} + * to provide Geode Spark Connector functionality. + * + * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> + */ +public class GeodeJavaDStreamFunctions<T> { + + public final GeodeDStreamFunctions<T> dsf; + + public GeodeJavaDStreamFunctions(JavaDStream<T> ds) { + this.dsf = new GeodeDStreamFunctions<T>(ds.dstream()); + } + + /** + * Save the JavaDStream to Geode key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param func the PairFunction that converts elements of JavaDStream to key/value pairs + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param opConf the optional parameters for this operation + */ + public <K, V> void saveToGeode( + String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) { + dsf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf)); + } + + /** + * Save the JavaDStream to Geode key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param func the PairFunction that converts elements of JavaDStream to key/value pairs + * @param opConf the optional parameters for this operation + */ + public <K, V> void saveToGeode( + String regionPath, PairFunction<T, K, V> func, Properties opConf) { + dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf)); + } + + /** + * Save the JavaDStream to Geode key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param func the PairFunction that converts elements of JavaDStream to key/value pairs + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + */ + public <K, V> void saveToGeode( + String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) { + dsf.saveToGeode(regionPath, func, connConf, emptyStrStrMap()); + } + + /** + * Save the JavaDStream to Geode key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param func the PairFunction that converts elements of JavaDStream to key/value pairs + */ + public <K, V> void saveToGeode( + String regionPath, PairFunction<T, K, V> func) { + dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), emptyStrStrMap()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java new file mode 100644 index 0000000..2c83255 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi; + +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import io.pivotal.geode.spark.connector.streaming.GeodePairDStreamFunctions; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import java.util.Properties; + +import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; + +/** + * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaPairDStream} + * to provide Geode Spark Connector functionality. + * + * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> + */ +public class GeodeJavaPairDStreamFunctions<K, V> { + + public final GeodePairDStreamFunctions<K, V> dsf; + + public GeodeJavaPairDStreamFunctions(JavaPairDStream<K, V> ds) { + this.dsf = new GeodePairDStreamFunctions<K, V>(ds.dstream()); + } + + /** + * Save the JavaPairDStream to Geode key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param opConf the optional parameters for this operation + */ + public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) { + dsf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf)); + } + + /** + * Save the JavaPairDStream to Geode key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + */ + public void saveToGeode(String regionPath, GeodeConnectionConf connConf) { + dsf.saveToGeode(regionPath, connConf, emptyStrStrMap()); + } + + /** + * Save the JavaPairDStream to Geode key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param opConf the optional parameters for this operation + */ + public void saveToGeode(String regionPath, Properties opConf) { + dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf)); + } + + /** + * Save the JavaPairDStream to Geode key-value store. + * @param regionPath the full path of region that the DStream is stored + */ + public void saveToGeode(String regionPath) { + dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), emptyStrStrMap()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java new file mode 100644 index 0000000..3278a5b --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi; + +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import io.pivotal.geode.spark.connector.GeodePairRDDFunctions; +import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD; +import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function; +import scala.Option; +import scala.Tuple2; +import scala.reflect.ClassTag; + +import java.util.Properties; + +import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; + +/** + * A Java API wrapper over {@link org.apache.spark.api.java.JavaPairRDD} to provide Geode Spark + * Connector functionality. + * + * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> + */ +public class GeodeJavaPairRDDFunctions<K, V> { + + public final GeodePairRDDFunctions<K, V> rddf; + + public GeodeJavaPairRDDFunctions(JavaPairRDD<K, V> rdd) { + this.rddf = new GeodePairRDDFunctions<K, V>(rdd.rdd()); + } + + /** + * Save the pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param opConf the parameters for this operation + */ + public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) { + rddf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf)); + } + + /** + * Save the pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param opConf the parameters for this operation + */ + public void saveToGeode(String regionPath, Properties opConf) { + rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf)); + } + + /** + * Save the pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + */ + public void saveToGeode(String regionPath, GeodeConnectionConf connConf) { + rddf.saveToGeode(regionPath, connConf, emptyStrStrMap()); + } + + /** + * Save the pair RDD to Geode key-value store with the default GeodeConnector. + * @param regionPath the full path of region that the RDD is stored + */ + public void saveToGeode(String regionPath) { + rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), emptyStrStrMap()); + } + + /** + * Return an JavaPairRDD containing all pairs of elements with matching keys in + * this RDD<K, V> and the Geode `Region<K, V2>`. Each pair of elements + * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and + * (k, v2) is in the Geode region. + * + * @param regionPath the region path of the Geode region + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<<K, V>, V2> + */ + public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(String regionPath) { + return joinGeodeRegion(regionPath, rddf.defaultConnectionConf()); + } + + /** + * Return an JavaPairRDD containing all pairs of elements with matching keys in + * this RDD<K, V> and the Geode `Region<K, V2>`. Each pair of elements + * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and + * (k, v2) is in the Geode region. + * + * @param regionPath the region path of the Geode region + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<<K, V>, V2> + */ + public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion( + String regionPath, GeodeConnectionConf connConf) { + GeodeJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.joinGeodeRegion(regionPath, connConf); + ClassTag<Tuple2<K, V>> kt = fakeClassTag(); + ClassTag<V2> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + + /** + * Return an RDD containing all pairs of elements with matching keys in this + * RDD<K, V> and the Geode `Region<K2, V2>`. The join key from RDD + * element is generated by `func(K, V) => K2`, and the key from the Geode + * region is just the key of the key/value pair. + * + * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, + * where (k, v) is in this RDD and (k2, v2) is in the Geode region. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element (K, V) + * @param <K2> the key type of the Geode region + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<Tuple2<K, V>, V2> + */ + public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion( + String regionPath, Function<Tuple2<K, V>, K2> func) { + return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Return an RDD containing all pairs of elements with matching keys in this + * RDD<K, V> and the Geode `Region<K2, V2>`. The join key from RDD + * element is generated by `func(K, V) => K2`, and the key from the Geode + * region is just the key of the key/value pair. + * + * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, + * where (k, v) is in this RDD and (k2, v2) is in the Geode region. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element (K, V) + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param <K2> the key type of the Geode region + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<Tuple2<K, V>, V2> + */ + public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion( + String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) { + GeodeJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.joinGeodeRegion(regionPath, func, connConf); + ClassTag<Tuple2<K, V>> kt = fakeClassTag(); + ClassTag<V2> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + + /** + * Perform a left outer join of this RDD<K, V> and the Geode `Region<K, V2>`. + * For each element (k, v) in this RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair + * ((k, v), None)) if no element in the Geode region have key k. + * + * @param regionPath the region path of the Geode region + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<Tuple2<K, V>, Option<V>> + */ + public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(String regionPath) { + return outerJoinGeodeRegion(regionPath, rddf.defaultConnectionConf()); + } + + /** + * Perform a left outer join of this RDD<K, V> and the Geode `Region<K, V2>`. + * For each element (k, v) in this RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair + * ((k, v), None)) if no element in the Geode region have key k. + * + * @param regionPath the region path of the Geode region + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<Tuple2<K, V>, Option<V>> + */ + public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion( + String regionPath, GeodeConnectionConf connConf) { + GeodeOuterJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, connConf); + ClassTag<Tuple2<K, V>> kt = fakeClassTag(); + ClassTag<Option<V2>> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + + /** + * Perform a left outer join of this RDD<K, V> and the Geode `Region<K2, V2>`. + * The join key from RDD element is generated by `func(K, V) => K2`, and the + * key from region is just the key of the key/value pair. + * + * For each element (k, v) in `this` RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair + * ((k, v), None)) if no element in the Geode region have key `func(k, v)`. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element (K, V) + * @param <K2> the key type of the Geode region + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<Tuple2<K, V>, Option<V>> + */ + public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion( + String regionPath, Function<Tuple2<K, V>, K2> func) { + return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Perform a left outer join of this RDD<K, V> and the Geode `Region<K2, V2>`. + * The join key from RDD element is generated by `func(K, V) => K2`, and the + * key from region is just the key of the key/value pair. + * + * For each element (k, v) in `this` RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair + * ((k, v), None)) if no element in the Geode region have key `func(k, v)`. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element (K, V) + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param <K2> the key type of the Geode region + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<Tuple2<K, V>, Option<V>> + */ + public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion( + String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) { + GeodeOuterJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf); + ClassTag<Tuple2<K, V>> kt = fakeClassTag(); + ClassTag<Option<V2>> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java new file mode 100644 index 0000000..e4f6f36 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi; + +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import io.pivotal.geode.spark.connector.GeodeRDDFunctions; +import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD; +import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import scala.Option; +import scala.reflect.ClassTag; + +import java.util.Properties; + +import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; + +/** + * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide Geode Spark + * Connector functionality. + * + * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> + */ +public class GeodeJavaRDDFunctions<T> { + + public final GeodeRDDFunctions<T> rddf; + + public GeodeJavaRDDFunctions(JavaRDD<T> rdd) { + this.rddf = new GeodeRDDFunctions<T>(rdd.rdd()); + } + + /** + * Save the non-pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param func the PairFunction that converts elements of JavaRDD to key/value pairs + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param opConf the parameters for this operation + */ + public <K, V> void saveToGeode( + String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) { + rddf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf)); + } + + /** + * Save the non-pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param func the PairFunction that converts elements of JavaRDD to key/value pairs + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + */ + public <K, V> void saveToGeode( + String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) { + rddf.saveToGeode(regionPath, func, connConf, emptyStrStrMap()); + } + + /** + * Save the non-pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param func the PairFunction that converts elements of JavaRDD to key/value pairs + * @param opConf the parameters for this operation + */ + public <K, V> void saveToGeode( + String regionPath, PairFunction<T, K, V> func, Properties opConf) { + rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf)); + } + + /** + * Save the non-pair RDD to Geode key-value store with default GeodeConnector. + * @param regionPath the full path of region that the RDD is stored + * @param func the PairFunction that converts elements of JavaRDD to key/value pairs + */ + public <K, V> void saveToGeode(String regionPath, PairFunction<T, K, V> func) { + rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), emptyStrStrMap()); + } + + /** + * Return an RDD containing all pairs of elements with matching keys in this + * RDD<T> and the Geode `Region<K, V>`. The join key from RDD + * element is generated by `func(T) => K`, and the key from the Geode + * region is just the key of the key/value pair. + * + * Each pair of elements of result RDD will be returned as a (t, v2) tuple, + * where t is from this RDD and v is from the Geode region. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element T + * @param <K> the key type of the Geode region + * @param <V> the value type of the Geode region + * @return JavaPairRDD<T, V> + */ + public <K, V> JavaPairRDD<T, V> joinGeodeRegion(String regionPath, Function<T, K> func) { + return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Return an RDD containing all pairs of elements with matching keys in this + * RDD<T> and the Geode `Region<K, V>`. The join key from RDD + * element is generated by `func(T) => K`, and the key from the Geode + * region is just the key of the key/value pair. + * + * Each pair of elements of result RDD will be returned as a (t, v2) tuple, + * where t is from this RDD and v is from the Geode region. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element T + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param <K> the key type of the Geode region + * @param <V> the value type of the Geode region + * @return JavaPairRDD<T, V> + */ + public <K, V> JavaPairRDD<T, V> joinGeodeRegion( + String regionPath, Function<T, K> func, GeodeConnectionConf connConf) { + GeodeJoinRDD<T, K, V> rdd = rddf.joinGeodeRegion(regionPath, func, connConf); + ClassTag<T> kt = fakeClassTag(); + ClassTag<V> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + + /** + * Perform a left outer join of this RDD<T> and the Geode `Region<K, V>`. + * The join key from RDD element is generated by `func(T) => K`, and the + * key from region is just the key of the key/value pair. + * + * For each element (t) in this RDD, the resulting RDD will either contain + * all pairs (t, Some(v)) for v in the Geode region, or the pair + * (t, None) if no element in the Geode region have key `func(t)`. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element T + * @param <K> the key type of the Geode region + * @param <V> the value type of the Geode region + * @return JavaPairRDD<T, Option<V>> + */ + public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(String regionPath, Function<T, K> func) { + return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Perform a left outer join of this RDD<T> and the Geode `Region<K, V>`. + * The join key from RDD element is generated by `func(T) => K`, and the + * key from region is just the key of the key/value pair. + * + * For each element (t) in this RDD, the resulting RDD will either contain + * all pairs (t, Some(v)) for v in the Geode region, or the pair + * (t, None) if no element in the Geode region have key `func(t)`. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element T + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param <K> the key type of the Geode region + * @param <V> the value type of the Geode region + * @return JavaPairRDD<T, Option<V>> + */ + public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion( + String regionPath, Function<T, K> func, GeodeConnectionConf connConf) { + GeodeOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf); + ClassTag<T> kt = fakeClassTag(); + ClassTag<Option<V>> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java new file mode 100644 index 0000000..3471bf90 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi; + +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import io.pivotal.geode.spark.connector.GeodeSQLContextFunctions; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.SQLContext; + +/** + * Java API wrapper over {@link org.apache.spark.sql.SQLContext} to provide Geode + * OQL functionality. + * + * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> + */ +public class GeodeJavaSQLContextFunctions { + + public final GeodeSQLContextFunctions scf; + + public GeodeJavaSQLContextFunctions(SQLContext sqlContext) { + scf = new GeodeSQLContextFunctions(sqlContext); + } + + public <T> DataFrame geodeOQL(String query) { + DataFrame df = scf.geodeOQL(query, scf.defaultConnectionConf()); + return df; + } + + public <T> DataFrame geodeOQL(String query, GeodeConnectionConf connConf) { + DataFrame df = scf.geodeOQL(query, connConf); + return df; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java new file mode 100644 index 0000000..ce6b1ff --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi; + + +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD; +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD$; +import org.apache.spark.SparkContext; +import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; + +import scala.reflect.ClassTag; +import java.util.Properties; + +/** + * Java API wrapper over {@link org.apache.spark.SparkContext} to provide Geode + * Connector functionality. + * + * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> + */ +public class GeodeJavaSparkContextFunctions { + + public final SparkContext sc; + + public GeodeJavaSparkContextFunctions(SparkContext sc) { + this.sc = sc; + } + + /** + * Expose a Geode region as a JavaPairRDD + * @param regionPath the full path of the region + * @param connConf the GeodeConnectionConf that can be used to access the region + * @param opConf the parameters for this operation, such as preferred partitioner. + */ + public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion( + String regionPath, GeodeConnectionConf connConf, Properties opConf) { + ClassTag<K> kt = fakeClassTag(); + ClassTag<V> vt = fakeClassTag(); + GeodeRegionRDD<K, V> rdd = GeodeRegionRDD$.MODULE$.apply( + sc, regionPath, connConf, propertiesToScalaMap(opConf), kt, vt); + return new GeodeJavaRegionRDD<>(rdd); + } + + /** + * Expose a Geode region as a JavaPairRDD with default GeodeConnector and no preferred partitioner. + * @param regionPath the full path of the region + */ + public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath) { + GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf()); + return geodeRegion(regionPath, connConf, new Properties()); + } + + /** + * Expose a Geode region as a JavaPairRDD with no preferred partitioner. + * @param regionPath the full path of the region + * @param connConf the GeodeConnectionConf that can be used to access the region + */ + public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, GeodeConnectionConf connConf) { + return geodeRegion(regionPath, connConf, new Properties()); + } + + /** + * Expose a Geode region as a JavaPairRDD with default GeodeConnector. + * @param regionPath the full path of the region + * @param opConf the parameters for this operation, such as preferred partitioner. + */ + public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, Properties opConf) { + GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf()); + return geodeRegion(regionPath, connConf, opConf); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f7eaa26c/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaUtil.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaUtil.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaUtil.java new file mode 100644 index 0000000..41fe7e5 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaUtil.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi; + +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import scala.Tuple2; + +import io.pivotal.geode.spark.connector.package$; + +/** + * The main entry point to Spark Geode Connector Java API. + * + * There are several helpful static factory methods which build useful wrappers + * around Spark Context, Streaming Context and RDD. There are also helper methods + * to convert JavaRDD<Tuple2<K, V>> to JavaPairRDD<K, V>. + */ +public final class GeodeJavaUtil { + + /** constants */ + public static String GeodeLocatorPropKey = package$.MODULE$.GeodeLocatorPropKey(); + // partitioner related keys and values + public static String PreferredPartitionerPropKey = package$.MODULE$.PreferredPartitionerPropKey(); + public static String NumberPartitionsPerServerPropKey = package$.MODULE$.NumberPartitionsPerServerPropKey(); + public static String OnePartitionPartitionerName = package$.MODULE$.OnePartitionPartitionerName(); + public static String ServerSplitsPartitionerName = package$.MODULE$.ServerSplitsPartitionerName(); + public static String RDDSaveBatchSizePropKey = package$.MODULE$.RDDSaveBatchSizePropKey(); + public static int RDDSaveBatchSizeDefault = package$.MODULE$.RDDSaveBatchSizeDefault(); + + /** The private constructor is used prevents user from creating instance of this class. */ + private GeodeJavaUtil() { } + + /** + * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based + * on an existing {@link SparkContext} instance. + */ + public static GeodeJavaSparkContextFunctions javaFunctions(SparkContext sc) { + return new GeodeJavaSparkContextFunctions(sc); + } + + /** + * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based + * on an existing {@link JavaSparkContext} instance. + */ + public static GeodeJavaSparkContextFunctions javaFunctions(JavaSparkContext jsc) { + return new GeodeJavaSparkContextFunctions(JavaSparkContext.toSparkContext(jsc)); + } + + /** + * A static factory method to create a {@link GeodeJavaPairRDDFunctions} based on an + * existing {@link org.apache.spark.api.java.JavaPairRDD} instance. + */ + public static <K, V> GeodeJavaPairRDDFunctions<K, V> javaFunctions(JavaPairRDD<K, V> rdd) { + return new GeodeJavaPairRDDFunctions<K, V>(rdd); + } + + /** + * A static factory method to create a {@link GeodeJavaRDDFunctions} based on an + * existing {@link org.apache.spark.api.java.JavaRDD} instance. + */ + public static <T> GeodeJavaRDDFunctions<T> javaFunctions(JavaRDD<T> rdd) { + return new GeodeJavaRDDFunctions<T>(rdd); + } + + /** + * A static factory method to create a {@link GeodeJavaPairDStreamFunctions} based on an + * existing {@link org.apache.spark.streaming.api.java.JavaPairDStream} instance. + */ + public static <K, V> GeodeJavaPairDStreamFunctions<K, V> javaFunctions(JavaPairDStream<K, V> ds) { + return new GeodeJavaPairDStreamFunctions<>(ds); + } + + /** + * A static factory method to create a {@link GeodeJavaDStreamFunctions} based on an + * existing {@link org.apache.spark.streaming.api.java.JavaDStream} instance. + */ + public static <T> GeodeJavaDStreamFunctions<T> javaFunctions(JavaDStream<T> ds) { + return new GeodeJavaDStreamFunctions<>(ds); + } + + /** Convert an instance of {@link org.apache.spark.api.java.JavaRDD}<<Tuple2<K, V>> + * to a {@link org.apache.spark.api.java.JavaPairRDD}<K, V>. + */ + public static <K, V> JavaPairRDD<K, V> toJavaPairRDD(JavaRDD<Tuple2<K, V>> rdd) { + return JavaAPIHelper.toJavaPairRDD(rdd); + } + + /** Convert an instance of {@link org.apache.spark.streaming.api.java.JavaDStream}<<Tuple2<K, V>> + * to a {@link org.apache.spark.streaming.api.java.JavaPairDStream}<K, V>. + */ + public static <K, V> JavaPairDStream<K, V> toJavaPairDStream(JavaDStream<Tuple2<K, V>> ds) { + return JavaAPIHelper.toJavaPairDStream(ds); + } + + /** + * A static factory method to create a {@link GeodeJavaSQLContextFunctions} based + * on an existing {@link SQLContext} instance. + */ + public static GeodeJavaSQLContextFunctions javaFunctions(SQLContext sqlContext) { + return new GeodeJavaSQLContextFunctions(sqlContext); + } + +}
