GEODE-37 change package name from io.pivotal.geode (for 
./geode-spark-connector/src/it/java/ittest/io/pivotal)to org.apache.geode 
for(to ./geode-spark-connector/src/it/java/ittest/org/apache)


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2d374c9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2d374c9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2d374c9d

Branch: refs/heads/develop
Commit: 2d374c9de5a48d7e08eb29594470aa62c3be3f87
Parents: 54cf6bf
Author: Hitesh Khamesra <hkhame...@pivotal.io>
Authored: Tue Sep 20 15:44:10 2016 -0700
Committer: Hitesh Khamesra <hkhame...@pivotal.io>
Committed: Tue Sep 20 16:01:02 2016 -0700

----------------------------------------------------------------------
 .../pivotal/geode/spark/connector/Employee.java |  54 ---
 .../spark/connector/JavaApiIntegrationTest.java | 424 -------------------
 .../geode/spark/connector/Portfolio.java        | 109 -----
 .../pivotal/geode/spark/connector/Position.java |  73 ----
 .../apache/geode/spark/connector/Employee.java  |  54 +++
 .../spark/connector/JavaApiIntegrationTest.java | 424 +++++++++++++++++++
 .../apache/geode/spark/connector/Portfolio.java | 109 +++++
 .../apache/geode/spark/connector/Position.java  |  73 ++++
 8 files changed, 660 insertions(+), 660 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d374c9d/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java
 
b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java
deleted file mode 100644
index 9fba9e1..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector;
-
-import java.io.Serializable;
-
-public class Employee implements Serializable {
-
-  private String name;
-
-  private int age;
-
-  public Employee(String n, int a) {
-    name = n;
-    age = a;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public int getAge() {
-    return age;
-  }
-
-  public String toString() {
-    return new StringBuilder().append("Employee[name=").append(name).
-            append(", age=").append(age).
-            append("]").toString();
-  }
-  
-  public boolean equals(Object o) {
-    if (o instanceof Employee) {
-      return ((Employee) o).name.equals(name) && ((Employee) o).age == age;
-    }
-    return false;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d374c9d/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java
 
b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java
deleted file mode 100644
index f1577f3..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector;
-
-import org.apache.geode.cache.Region;
-import org.apache.geode.distributed.ConfigurationProperties;
-import io.pivotal.geode.spark.connector.GeodeConnection;
-import io.pivotal.geode.spark.connector.GeodeConnectionConf;
-import io.pivotal.geode.spark.connector.GeodeConnectionConf$;
-import 
io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager$;
-import io.pivotal.geode.spark.connector.javaapi.GeodeJavaRegionRDD;
-import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster$;
-import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.scalatest.junit.JUnitSuite;
-import io.pivotal.geode.spark.connector.package$;
-import scala.Tuple2;
-import scala.Option;
-import scala.Some;
-import java.util.*;
-
-import static 
io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.RDDSaveBatchSizePropKey;
-import static 
io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.javaFunctions;
-import static org.junit.Assert.*;
-
-public class JavaApiIntegrationTest extends JUnitSuite {
-
-  static JavaSparkContext jsc = null;
-  static GeodeConnectionConf connConf = null;
-  
-  static int numServers = 2;
-  static int numObjects = 1000;
-  static String regionPath = "pr_str_int_region";
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    // start geode cluster, and spark context
-    Properties settings = new Properties();
-    settings.setProperty(ConfigurationProperties.CACHE_XML_FILE, 
"src/it/resources/test-retrieve-regions.xml");
-    settings.setProperty("num-of-servers", Integer.toString(numServers));
-    int locatorPort = GeodeCluster$.MODULE$.start(settings);
-
-    // start spark context in local mode
-    Properties props = new Properties();
-    props.put("log4j.logger.org.apache.spark", "INFO");
-    props.put("log4j.logger.io.pivotal.geode.spark.connector","DEBUG");
-    IOUtils.configTestLog4j("ERROR", props);
-    SparkConf conf = new SparkConf()
-            .setAppName("RetrieveRegionIntegrationTest")
-            .setMaster("local[2]")
-            .set(package$.MODULE$.GeodeLocatorPropKey(), "localhost:"+ 
locatorPort);
-    // sc = new SparkContext(conf);
-    jsc = new JavaSparkContext(conf);
-    connConf = GeodeConnectionConf.apply(jsc.getConf());
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    // stop connection, spark context, and geode cluster
-    
DefaultGeodeConnectionManager$.MODULE$.closeConnection(GeodeConnectionConf$.MODULE$.apply(jsc.getConf()));
-    jsc.stop();
-    GeodeCluster$.MODULE$.stop();
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //   utility methods
-  // 
--------------------------------------------------------------------------------------------
-
-  private <K,V> void matchMapAndPairList(Map<K,V> map, List<Tuple2<K,V>> list) 
{
-    assertTrue("size mismatch \nmap: " + map.toString() + "\nlist: " + 
list.toString(), map.size() == list.size());
-    for (Tuple2<K, V> p : list) {
-      assertTrue("value mismatch: k=" + p._1() + " v1=" + p._2() + " v2=" + 
map.get(p._1()),
-                 p._2().equals(map.get(p._1())));
-    }
-  }
-
-  private Region<String, Integer> prepareStrIntRegion(String regionPath, int 
start, int stop) {
-    HashMap<String, Integer> entriesMap = new HashMap<>();
-    for (int i = start; i < stop; i ++) {
-      entriesMap.put("k_" + i, i);
-    }
-
-    GeodeConnection conn = connConf.getConnection();
-    Region<String, Integer> region = conn.getRegionProxy(regionPath);
-    region.removeAll(region.keySetOnServer());
-    region.putAll(entriesMap);
-    return region;
-  }
-
-  private JavaPairRDD<String, Integer> prepareStrIntJavaPairRDD(int start, int 
stop) {
-    List<Tuple2<String, Integer>> data = new ArrayList<>();
-    for (int i = start; i < stop; i ++) {
-      data.add(new Tuple2<>("k_" + i, i));
-    }
-    return jsc.parallelizePairs(data);
-  }
-
-  private JavaPairRDD<Integer, Integer> prepareIntIntJavaPairRDD(int start, 
int stop) {
-    List<Tuple2<Integer, Integer>> data = new ArrayList<>();
-    for (int i = start; i < stop; i ++) {
-      data.add(new Tuple2<>(i, i * 2));
-    }
-    return jsc.parallelizePairs(data);
-  }
-
-  private JavaRDD<Integer> prepareIntJavaRDD(int start, int stop) {
-    List<Integer> data = new ArrayList<>();
-    for (int i = start; i < stop; i ++) {
-      data.add(i);
-    }
-    return jsc.parallelize(data);
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //   JavaRDD.saveToGeode
-  // 
--------------------------------------------------------------------------------------------
-
-  static class IntToStrIntPairFunction implements PairFunction<Integer, 
String, Integer> {
-    @Override public Tuple2<String, Integer> call(Integer x) throws Exception {
-      return new Tuple2<>("k_" + x, x);
-    }
-  }
-
-  @Test
-  public void testRDDSaveToGeodeWithDefaultConnConfAndOpConf() throws 
Exception {
-    verifyRDDSaveToGeode(true, true);
-  }
-
-  @Test
-  public void testRDDSaveToGeodeWithDefaultConnConf() throws Exception {
-    verifyRDDSaveToGeode(true, false);
-  }
-  
-  @Test
-  public void testRDDSaveToGeodeWithConnConfAndOpConf() throws Exception {
-    verifyRDDSaveToGeode(false, true);
-  }
-
-  @Test
-  public void testRDDSaveToGeodeWithConnConf() throws Exception {
-    verifyRDDSaveToGeode(false, false);
-  }
-  
-  public void verifyRDDSaveToGeode(boolean useDefaultConnConf, boolean 
useOpConf) throws Exception {
-    Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0);  
// remove all entries
-    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(0, numObjects);
-
-    PairFunction<Integer, String, Integer> func = new 
IntToStrIntPairFunction();
-    Properties opConf = new Properties();
-    opConf.put(RDDSaveBatchSizePropKey, "200");
-
-    if (useDefaultConnConf) {
-      if (useOpConf)
-        javaFunctions(rdd1).saveToGeode(regionPath, func, opConf);
-      else
-        javaFunctions(rdd1).saveToGeode(regionPath, func);
-    } else {
-      if (useOpConf)
-        javaFunctions(rdd1).saveToGeode(regionPath, func, connConf, opConf);
-      else
-        javaFunctions(rdd1).saveToGeode(regionPath, func, connConf);
-    }
-    
-    Set<String> keys = region.keySetOnServer();
-    Map<String, Integer> map = region.getAll(keys);
-
-    List<Tuple2<String, Integer>> expectedList = new ArrayList<>();
-
-    for (int i = 0; i < numObjects; i ++) {
-      expectedList.add(new Tuple2<>("k_" + i, i));
-    }
-    matchMapAndPairList(map, expectedList);
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //   JavaPairRDD.saveToGeode
-  // 
--------------------------------------------------------------------------------------------
-
-  @Test
-  public void testPairRDDSaveToGeodeWithDefaultConnConfAndOpConf() throws 
Exception {
-    verifyPairRDDSaveToGeode(true, true);
-  }
-
-  @Test
-  public void testPairRDDSaveToGeodeWithDefaultConnConf() throws Exception {
-    verifyPairRDDSaveToGeode(true, false);
-  }
-  
-  @Test
-  public void testPairRDDSaveToGeodeWithConnConfAndOpConf() throws Exception {
-    verifyPairRDDSaveToGeode(false, true);
-  }
-
-  @Test
-  public void testPairRDDSaveToGeodeWithConnConf() throws Exception {
-    verifyPairRDDSaveToGeode(false, false);
-  }
-  
-  public void verifyPairRDDSaveToGeode(boolean useDefaultConnConf, boolean 
useOpConf) throws Exception {
-    Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0);  
// remove all entries
-    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(0, 
numObjects);
-    Properties opConf = new Properties();
-    opConf.put(RDDSaveBatchSizePropKey, "200");
-
-    if (useDefaultConnConf) {
-      if (useOpConf)
-        javaFunctions(rdd1).saveToGeode(regionPath, opConf);
-      else
-        javaFunctions(rdd1).saveToGeode(regionPath);
-    } else {
-      if (useOpConf)
-        javaFunctions(rdd1).saveToGeode(regionPath, connConf, opConf);
-      else
-        javaFunctions(rdd1).saveToGeode(regionPath, connConf);
-    }
-
-    Set<String> keys = region.keySetOnServer();
-    Map<String, Integer> map = region.getAll(keys);
-
-    List<Tuple2<String, Integer>> expectedList = new ArrayList<>();
-    for (int i = 0; i < numObjects; i ++) {
-      expectedList.add(new Tuple2<>("k_" + i, i));
-    }
-    matchMapAndPairList(map, expectedList);
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //   JavaSparkContext.geodeRegion and where clause
-  // 
--------------------------------------------------------------------------------------------
-
-  @Test
-  public void testJavaSparkContextGeodeRegion() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);  // remove all entries
-    Properties emptyProps = new Properties();
-    GeodeJavaRegionRDD<String, Integer> rdd1 = 
javaFunctions(jsc).geodeRegion(regionPath);
-    GeodeJavaRegionRDD<String, Integer> rdd2 = 
javaFunctions(jsc).geodeRegion(regionPath, emptyProps);
-    GeodeJavaRegionRDD<String, Integer> rdd3 = 
javaFunctions(jsc).geodeRegion(regionPath, connConf);
-    GeodeJavaRegionRDD<String, Integer> rdd4 = 
javaFunctions(jsc).geodeRegion(regionPath, connConf, emptyProps);
-    GeodeJavaRegionRDD<String, Integer> rdd5 = rdd1.where("value.intValue() < 
50");
-
-    HashMap<String, Integer> expectedMap = new HashMap<>();
-    for (int i = 0; i < numObjects; i ++) {
-      expectedMap.put("k_" + i, i);
-    }
-
-    matchMapAndPairList(expectedMap, rdd1.collect());
-    matchMapAndPairList(expectedMap, rdd2.collect());
-    matchMapAndPairList(expectedMap, rdd3.collect());
-    matchMapAndPairList(expectedMap, rdd4.collect());
-
-    HashMap<String, Integer> expectedMap2 = new HashMap<>();
-    for (int i = 0; i < 50; i ++) {
-      expectedMap2.put("k_" + i, i);
-    }
-
-    matchMapAndPairList(expectedMap2, rdd5.collect());
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //   JavaPairRDD.joinGeodeRegion
-  // 
--------------------------------------------------------------------------------------------
-
-  @Test
-  public void testPairRDDJoinWithSameKeyType() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);
-    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10);
-
-    JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2a = 
javaFunctions(rdd1).joinGeodeRegion(regionPath);
-    JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2b = 
javaFunctions(rdd1).joinGeodeRegion(regionPath, connConf);
-    // System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
-
-    HashMap<Tuple2<String, Integer>, Integer> expectedMap = new HashMap<>();
-    for (int i = 0; i < 10; i ++) {
-      expectedMap.put(new Tuple2<>("k_" + i, i), i);
-    }
-    matchMapAndPairList(expectedMap, rdd2a.collect());
-    matchMapAndPairList(expectedMap, rdd2b.collect());
-  }
-
-  static class IntIntPairToStrKeyFunction implements Function<Tuple2<Integer, 
Integer>, String> {
-    @Override public String call(Tuple2<Integer, Integer> pair) throws 
Exception {
-      return "k_" + pair._1();
-    }
-  }
-
-  @Test
-  public void testPairRDDJoinWithDiffKeyType() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);
-    JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10);
-    Function<Tuple2<Integer, Integer>, String> func = new 
IntIntPairToStrKeyFunction();
-
-    JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2a = 
javaFunctions(rdd1).joinGeodeRegion(regionPath, func);
-    JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2b = 
javaFunctions(rdd1).joinGeodeRegion(regionPath, func, connConf);
-    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
-
-    HashMap<Tuple2<Integer, Integer>, Integer> expectedMap = new HashMap<>();
-    for (int i = 0; i < 10; i ++) {
-      expectedMap.put(new Tuple2<>(i, i * 2), i);
-    }
-    matchMapAndPairList(expectedMap, rdd2a.collect());
-    matchMapAndPairList(expectedMap, rdd2b.collect());
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //   JavaPairRDD.outerJoinGeodeRegion
-  // 
--------------------------------------------------------------------------------------------
-
-  @Test
-  public void testPairRDDOuterJoinWithSameKeyType() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);
-    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10);
-
-    JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2a = 
javaFunctions(rdd1).outerJoinGeodeRegion(regionPath);
-    JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2b = 
javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, connConf);
-    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
-
-    HashMap<Tuple2<String, Integer>, Option<Integer>> expectedMap = new 
HashMap<>();
-    for (int i = -5; i < 10; i ++) {
-      if (i < 0)
-        expectedMap.put(new Tuple2<>("k_" + i, i), Option.apply((Integer) 
null));
-      else
-        expectedMap.put(new Tuple2<>("k_" + i, i), Some.apply(i));
-    }
-    matchMapAndPairList(expectedMap, rdd2a.collect());
-    matchMapAndPairList(expectedMap, rdd2b.collect());
-  }
-
-  @Test
-  public void testPairRDDOuterJoinWithDiffKeyType() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);
-    JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10);
-    Function<Tuple2<Integer, Integer>, String> func = new 
IntIntPairToStrKeyFunction();
-
-    JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2a = 
javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func);
-    JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2b = 
javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func, connConf);
-    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
-
-    HashMap<Tuple2<Integer, Integer>, Option<Integer>> expectedMap = new 
HashMap<>();
-    for (int i = -5; i < 10; i ++) {
-      if (i < 0)
-        expectedMap.put(new Tuple2<>(i, i * 2), Option.apply((Integer) null));
-      else
-        expectedMap.put(new Tuple2<>(i, i * 2), Some.apply(i));
-    }
-    matchMapAndPairList(expectedMap, rdd2a.collect());
-    matchMapAndPairList(expectedMap, rdd2b.collect());
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //   JavaRDD.joinGeodeRegion
-  // 
--------------------------------------------------------------------------------------------
-
-  static class IntToStrKeyFunction implements Function<Integer, String> {
-    @Override public String call(Integer x) throws Exception {
-      return "k_" + x;
-    }
-  }
-
-  @Test
-  public void testRDDJoinWithSameKeyType() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);
-    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10);
-
-    Function<Integer, String> func = new IntToStrKeyFunction();
-    JavaPairRDD<Integer, Integer> rdd2a = 
javaFunctions(rdd1).joinGeodeRegion(regionPath, func);
-    JavaPairRDD<Integer, Integer> rdd2b = 
javaFunctions(rdd1).joinGeodeRegion(regionPath, func, connConf);
-    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
-
-    HashMap<Integer, Integer> expectedMap = new HashMap<>();
-    for (int i = 0; i < 10; i ++) {
-      expectedMap.put(i, i);
-    }
-    matchMapAndPairList(expectedMap, rdd2a.collect());
-    matchMapAndPairList(expectedMap, rdd2b.collect());
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //   JavaRDD.outerJoinGeodeRegion
-  // 
--------------------------------------------------------------------------------------------
-
-  @Test
-  public void testRDDOuterJoinWithSameKeyType() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);
-    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10);
-
-    Function<Integer, String> func = new IntToStrKeyFunction();
-    JavaPairRDD<Integer, Option<Integer>> rdd2a = 
javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func);
-    JavaPairRDD<Integer, Option<Integer>> rdd2b = 
javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func, connConf);
-    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
-
-    HashMap<Integer, Option<Integer>> expectedMap = new HashMap<>();
-    for (int i = -5; i < 10; i ++) {
-      if (i < 0)
-        expectedMap.put(i, Option.apply((Integer) null));
-      else
-        expectedMap.put(i, Some.apply(i));
-    }
-    matchMapAndPairList(expectedMap, rdd2a.collect());
-    matchMapAndPairList(expectedMap, rdd2b.collect());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d374c9d/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java
 
b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java
deleted file mode 100644
index 63477eb..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector;
-
-import java.io.Serializable;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Properties;
-import org.apache.geode.cache.Declarable;
-
-/**
- * A stock portfolio that consists of multiple {@link Position} objects that
- * represent shares of stock (a "security").  Instances of
- * <code>Portfolio</code> can be stored in a Geode <code>Region</code> and
- * their contents can be queried using the Geode query service.
- * </p>
- * This class is <code>Serializable</code> because we want it to be distributed
- * to multiple members of a distributed system.  Because this class is
- * <code>Declarable</code>, we can describe instances of it in a Geode
- * <code>cache.xml</code> file.
- * </p>
- *
- */
-public class Portfolio implements Declarable, Serializable {
-
-  private static final long serialVersionUID = 9097335119586059309L;
-
-  private int id;  /* id is used as the entry key and is stored in the entry */
-  private String type;
-  private Map<String,Position> positions = new 
LinkedHashMap<String,Position>();
-  private String status;
-
-  public Portfolio(Properties props) {
-    init(props);
-  }
-
-  @Override
-  public void init(Properties props) {
-    this.id = Integer.parseInt(props.getProperty("id"));
-    this.type = props.getProperty("type", "type1");
-    this.status = props.getProperty("status", "active");
-
-    // get the positions. These are stored in the properties object
-    // as Positions, not String, so use Hashtable protocol to get at them.
-    // the keys are named "positionN", where N is an integer.
-    for (Map.Entry<Object, Object> entry: props.entrySet()) {
-      String key = (String)entry.getKey();
-      if (key.startsWith("position")) {
-        Position pos = (Position)entry.getValue();
-        this.positions.put(pos.getSecId(), pos);
-      }
-    }
-  }
-
-  public void setType(String t) {this.type = t; }
-
-  public String getStatus(){
-    return status;
-  }
-
-  public int getId(){
-    return this.id;
-  }
-
-  public Map<String,Position> getPositions(){
-    return this.positions;
-  }
-
-  public String getType() {
-    return this.type;
-  }
-
-  public boolean isActive(){
-    return status.equals("active");
-  }
-
-  @Override
-  public String toString(){
-    StringBuilder buf = new StringBuilder();
-    buf.append("\n\tPortfolio [id=" + this.id + " status=" + this.status);
-    buf.append(" type=" + this.type);
-    boolean firstTime = true;
-    for (Map.Entry<String, Position> entry: positions.entrySet()) {
-      if (!firstTime) {
-        buf.append(", ");
-      }
-      buf.append("\n\t\t");
-      buf.append(entry.getKey() + ":" + entry.getValue());
-      firstTime = false;
-    }
-    buf.append("]");
-    return buf.toString();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d374c9d/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java
 
b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java
deleted file mode 100644
index 7c99ef7..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.io.pivotal.geode.spark.connector;
-
-import java.io.Serializable;
-import java.util.Properties;
-import org.apache.geode.cache.Declarable;
-
-/**
- * Represents a number of shares of a stock ("security") held in a {@link
- * Portfolio}.
- * </p>
- * This class is <code>Serializable</code> because we want it to be distributed
- * to multiple members of a distributed system.  Because this class is
- * <code>Declarable</code>, we can describe instances of it in a Geode
- * <code>cache.xml</code> file.
- * </p>
- *
- */
-public class Position implements Declarable, Serializable {
-
-  private static final long serialVersionUID = -8229531542107983344L;
-
-  private String secId;
-  private double qty;
-  private double mktValue;
-
-  public Position(Properties props) {
-    init(props);
-  }
-
-  @Override
-  public void init(Properties props) {
-    this.secId = props.getProperty("secId");
-    this.qty = Double.parseDouble(props.getProperty("qty"));
-    this.mktValue = Double.parseDouble(props.getProperty("mktValue"));
-  }
-
-  public String getSecId(){
-    return this.secId;
-  }
-
-  public double getQty(){
-    return this.qty;
-  }
-
-  public double getMktValue() {
-    return this.mktValue;
-  }
-
-  @Override
-  public String toString(){
-    return new StringBuilder()
-            .append("Position [secId=").append(secId)
-            .append(" qty=").append(this.qty)
-            .append(" mktValue=").append(mktValue).append("]").toString();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d374c9d/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Employee.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Employee.java
 
b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Employee.java
new file mode 100644
index 0000000..9fba9e1
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Employee.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector;
+
+import java.io.Serializable;
+
+public class Employee implements Serializable {
+
+  private String name;
+
+  private int age;
+
+  public Employee(String n, int a) {
+    name = n;
+    age = a;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public int getAge() {
+    return age;
+  }
+
+  public String toString() {
+    return new StringBuilder().append("Employee[name=").append(name).
+            append(", age=").append(age).
+            append("]").toString();
+  }
+  
+  public boolean equals(Object o) {
+    if (o instanceof Employee) {
+      return ((Employee) o).name.equals(name) && ((Employee) o).age == age;
+    }
+    return false;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d374c9d/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/JavaApiIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/JavaApiIntegrationTest.java
 
b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/JavaApiIntegrationTest.java
new file mode 100644
index 0000000..f1577f3
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/JavaApiIntegrationTest.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.ConfigurationProperties;
+import io.pivotal.geode.spark.connector.GeodeConnection;
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.GeodeConnectionConf$;
+import 
io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager$;
+import io.pivotal.geode.spark.connector.javaapi.GeodeJavaRegionRDD;
+import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster$;
+import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.scalatest.junit.JUnitSuite;
+import io.pivotal.geode.spark.connector.package$;
+import scala.Tuple2;
+import scala.Option;
+import scala.Some;
+import java.util.*;
+
+import static 
io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.RDDSaveBatchSizePropKey;
+import static 
io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.javaFunctions;
+import static org.junit.Assert.*;
+
+public class JavaApiIntegrationTest extends JUnitSuite {
+
+  static JavaSparkContext jsc = null;
+  static GeodeConnectionConf connConf = null;
+  
+  static int numServers = 2;
+  static int numObjects = 1000;
+  static String regionPath = "pr_str_int_region";
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // start geode cluster, and spark context
+    Properties settings = new Properties();
+    settings.setProperty(ConfigurationProperties.CACHE_XML_FILE, 
"src/it/resources/test-retrieve-regions.xml");
+    settings.setProperty("num-of-servers", Integer.toString(numServers));
+    int locatorPort = GeodeCluster$.MODULE$.start(settings);
+
+    // start spark context in local mode
+    Properties props = new Properties();
+    props.put("log4j.logger.org.apache.spark", "INFO");
+    props.put("log4j.logger.io.pivotal.geode.spark.connector","DEBUG");
+    IOUtils.configTestLog4j("ERROR", props);
+    SparkConf conf = new SparkConf()
+            .setAppName("RetrieveRegionIntegrationTest")
+            .setMaster("local[2]")
+            .set(package$.MODULE$.GeodeLocatorPropKey(), "localhost:"+ 
locatorPort);
+    // sc = new SparkContext(conf);
+    jsc = new JavaSparkContext(conf);
+    connConf = GeodeConnectionConf.apply(jsc.getConf());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    // stop connection, spark context, and geode cluster
+    
DefaultGeodeConnectionManager$.MODULE$.closeConnection(GeodeConnectionConf$.MODULE$.apply(jsc.getConf()));
+    jsc.stop();
+    GeodeCluster$.MODULE$.stop();
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   utility methods
+  // 
--------------------------------------------------------------------------------------------
+
+  private <K,V> void matchMapAndPairList(Map<K,V> map, List<Tuple2<K,V>> list) 
{
+    assertTrue("size mismatch \nmap: " + map.toString() + "\nlist: " + 
list.toString(), map.size() == list.size());
+    for (Tuple2<K, V> p : list) {
+      assertTrue("value mismatch: k=" + p._1() + " v1=" + p._2() + " v2=" + 
map.get(p._1()),
+                 p._2().equals(map.get(p._1())));
+    }
+  }
+
+  private Region<String, Integer> prepareStrIntRegion(String regionPath, int 
start, int stop) {
+    HashMap<String, Integer> entriesMap = new HashMap<>();
+    for (int i = start; i < stop; i ++) {
+      entriesMap.put("k_" + i, i);
+    }
+
+    GeodeConnection conn = connConf.getConnection();
+    Region<String, Integer> region = conn.getRegionProxy(regionPath);
+    region.removeAll(region.keySetOnServer());
+    region.putAll(entriesMap);
+    return region;
+  }
+
+  private JavaPairRDD<String, Integer> prepareStrIntJavaPairRDD(int start, int 
stop) {
+    List<Tuple2<String, Integer>> data = new ArrayList<>();
+    for (int i = start; i < stop; i ++) {
+      data.add(new Tuple2<>("k_" + i, i));
+    }
+    return jsc.parallelizePairs(data);
+  }
+
+  private JavaPairRDD<Integer, Integer> prepareIntIntJavaPairRDD(int start, 
int stop) {
+    List<Tuple2<Integer, Integer>> data = new ArrayList<>();
+    for (int i = start; i < stop; i ++) {
+      data.add(new Tuple2<>(i, i * 2));
+    }
+    return jsc.parallelizePairs(data);
+  }
+
+  private JavaRDD<Integer> prepareIntJavaRDD(int start, int stop) {
+    List<Integer> data = new ArrayList<>();
+    for (int i = start; i < stop; i ++) {
+      data.add(i);
+    }
+    return jsc.parallelize(data);
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaRDD.saveToGeode
+  // 
--------------------------------------------------------------------------------------------
+
+  static class IntToStrIntPairFunction implements PairFunction<Integer, 
String, Integer> {
+    @Override public Tuple2<String, Integer> call(Integer x) throws Exception {
+      return new Tuple2<>("k_" + x, x);
+    }
+  }
+
+  @Test
+  public void testRDDSaveToGeodeWithDefaultConnConfAndOpConf() throws 
Exception {
+    verifyRDDSaveToGeode(true, true);
+  }
+
+  @Test
+  public void testRDDSaveToGeodeWithDefaultConnConf() throws Exception {
+    verifyRDDSaveToGeode(true, false);
+  }
+  
+  @Test
+  public void testRDDSaveToGeodeWithConnConfAndOpConf() throws Exception {
+    verifyRDDSaveToGeode(false, true);
+  }
+
+  @Test
+  public void testRDDSaveToGeodeWithConnConf() throws Exception {
+    verifyRDDSaveToGeode(false, false);
+  }
+  
+  public void verifyRDDSaveToGeode(boolean useDefaultConnConf, boolean 
useOpConf) throws Exception {
+    Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0);  
// remove all entries
+    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(0, numObjects);
+
+    PairFunction<Integer, String, Integer> func = new 
IntToStrIntPairFunction();
+    Properties opConf = new Properties();
+    opConf.put(RDDSaveBatchSizePropKey, "200");
+
+    if (useDefaultConnConf) {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGeode(regionPath, func, opConf);
+      else
+        javaFunctions(rdd1).saveToGeode(regionPath, func);
+    } else {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGeode(regionPath, func, connConf, opConf);
+      else
+        javaFunctions(rdd1).saveToGeode(regionPath, func, connConf);
+    }
+    
+    Set<String> keys = region.keySetOnServer();
+    Map<String, Integer> map = region.getAll(keys);
+
+    List<Tuple2<String, Integer>> expectedList = new ArrayList<>();
+
+    for (int i = 0; i < numObjects; i ++) {
+      expectedList.add(new Tuple2<>("k_" + i, i));
+    }
+    matchMapAndPairList(map, expectedList);
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaPairRDD.saveToGeode
+  // 
--------------------------------------------------------------------------------------------
+
+  @Test
+  public void testPairRDDSaveToGeodeWithDefaultConnConfAndOpConf() throws 
Exception {
+    verifyPairRDDSaveToGeode(true, true);
+  }
+
+  @Test
+  public void testPairRDDSaveToGeodeWithDefaultConnConf() throws Exception {
+    verifyPairRDDSaveToGeode(true, false);
+  }
+  
+  @Test
+  public void testPairRDDSaveToGeodeWithConnConfAndOpConf() throws Exception {
+    verifyPairRDDSaveToGeode(false, true);
+  }
+
+  @Test
+  public void testPairRDDSaveToGeodeWithConnConf() throws Exception {
+    verifyPairRDDSaveToGeode(false, false);
+  }
+  
+  public void verifyPairRDDSaveToGeode(boolean useDefaultConnConf, boolean 
useOpConf) throws Exception {
+    Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0);  
// remove all entries
+    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(0, 
numObjects);
+    Properties opConf = new Properties();
+    opConf.put(RDDSaveBatchSizePropKey, "200");
+
+    if (useDefaultConnConf) {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGeode(regionPath, opConf);
+      else
+        javaFunctions(rdd1).saveToGeode(regionPath);
+    } else {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGeode(regionPath, connConf, opConf);
+      else
+        javaFunctions(rdd1).saveToGeode(regionPath, connConf);
+    }
+
+    Set<String> keys = region.keySetOnServer();
+    Map<String, Integer> map = region.getAll(keys);
+
+    List<Tuple2<String, Integer>> expectedList = new ArrayList<>();
+    for (int i = 0; i < numObjects; i ++) {
+      expectedList.add(new Tuple2<>("k_" + i, i));
+    }
+    matchMapAndPairList(map, expectedList);
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaSparkContext.geodeRegion and where clause
+  // 
--------------------------------------------------------------------------------------------
+
+  @Test
+  public void testJavaSparkContextGeodeRegion() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);  // remove all entries
+    Properties emptyProps = new Properties();
+    GeodeJavaRegionRDD<String, Integer> rdd1 = 
javaFunctions(jsc).geodeRegion(regionPath);
+    GeodeJavaRegionRDD<String, Integer> rdd2 = 
javaFunctions(jsc).geodeRegion(regionPath, emptyProps);
+    GeodeJavaRegionRDD<String, Integer> rdd3 = 
javaFunctions(jsc).geodeRegion(regionPath, connConf);
+    GeodeJavaRegionRDD<String, Integer> rdd4 = 
javaFunctions(jsc).geodeRegion(regionPath, connConf, emptyProps);
+    GeodeJavaRegionRDD<String, Integer> rdd5 = rdd1.where("value.intValue() < 
50");
+
+    HashMap<String, Integer> expectedMap = new HashMap<>();
+    for (int i = 0; i < numObjects; i ++) {
+      expectedMap.put("k_" + i, i);
+    }
+
+    matchMapAndPairList(expectedMap, rdd1.collect());
+    matchMapAndPairList(expectedMap, rdd2.collect());
+    matchMapAndPairList(expectedMap, rdd3.collect());
+    matchMapAndPairList(expectedMap, rdd4.collect());
+
+    HashMap<String, Integer> expectedMap2 = new HashMap<>();
+    for (int i = 0; i < 50; i ++) {
+      expectedMap2.put("k_" + i, i);
+    }
+
+    matchMapAndPairList(expectedMap2, rdd5.collect());
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaPairRDD.joinGeodeRegion
+  // 
--------------------------------------------------------------------------------------------
+
+  @Test
+  public void testPairRDDJoinWithSameKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10);
+
+    JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2a = 
javaFunctions(rdd1).joinGeodeRegion(regionPath);
+    JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2b = 
javaFunctions(rdd1).joinGeodeRegion(regionPath, connConf);
+    // System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
+
+    HashMap<Tuple2<String, Integer>, Integer> expectedMap = new HashMap<>();
+    for (int i = 0; i < 10; i ++) {
+      expectedMap.put(new Tuple2<>("k_" + i, i), i);
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  static class IntIntPairToStrKeyFunction implements Function<Tuple2<Integer, 
Integer>, String> {
+    @Override public String call(Tuple2<Integer, Integer> pair) throws 
Exception {
+      return "k_" + pair._1();
+    }
+  }
+
+  @Test
+  public void testPairRDDJoinWithDiffKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10);
+    Function<Tuple2<Integer, Integer>, String> func = new 
IntIntPairToStrKeyFunction();
+
+    JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2a = 
javaFunctions(rdd1).joinGeodeRegion(regionPath, func);
+    JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2b = 
javaFunctions(rdd1).joinGeodeRegion(regionPath, func, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
+
+    HashMap<Tuple2<Integer, Integer>, Integer> expectedMap = new HashMap<>();
+    for (int i = 0; i < 10; i ++) {
+      expectedMap.put(new Tuple2<>(i, i * 2), i);
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaPairRDD.outerJoinGeodeRegion
+  // 
--------------------------------------------------------------------------------------------
+
+  @Test
+  public void testPairRDDOuterJoinWithSameKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10);
+
+    JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2a = 
javaFunctions(rdd1).outerJoinGeodeRegion(regionPath);
+    JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2b = 
javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
+
+    HashMap<Tuple2<String, Integer>, Option<Integer>> expectedMap = new 
HashMap<>();
+    for (int i = -5; i < 10; i ++) {
+      if (i < 0)
+        expectedMap.put(new Tuple2<>("k_" + i, i), Option.apply((Integer) 
null));
+      else
+        expectedMap.put(new Tuple2<>("k_" + i, i), Some.apply(i));
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  @Test
+  public void testPairRDDOuterJoinWithDiffKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10);
+    Function<Tuple2<Integer, Integer>, String> func = new 
IntIntPairToStrKeyFunction();
+
+    JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2a = 
javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func);
+    JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2b = 
javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
+
+    HashMap<Tuple2<Integer, Integer>, Option<Integer>> expectedMap = new 
HashMap<>();
+    for (int i = -5; i < 10; i ++) {
+      if (i < 0)
+        expectedMap.put(new Tuple2<>(i, i * 2), Option.apply((Integer) null));
+      else
+        expectedMap.put(new Tuple2<>(i, i * 2), Some.apply(i));
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaRDD.joinGeodeRegion
+  // 
--------------------------------------------------------------------------------------------
+
+  static class IntToStrKeyFunction implements Function<Integer, String> {
+    @Override public String call(Integer x) throws Exception {
+      return "k_" + x;
+    }
+  }
+
+  @Test
+  public void testRDDJoinWithSameKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10);
+
+    Function<Integer, String> func = new IntToStrKeyFunction();
+    JavaPairRDD<Integer, Integer> rdd2a = 
javaFunctions(rdd1).joinGeodeRegion(regionPath, func);
+    JavaPairRDD<Integer, Integer> rdd2b = 
javaFunctions(rdd1).joinGeodeRegion(regionPath, func, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
+
+    HashMap<Integer, Integer> expectedMap = new HashMap<>();
+    for (int i = 0; i < 10; i ++) {
+      expectedMap.put(i, i);
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaRDD.outerJoinGeodeRegion
+  // 
--------------------------------------------------------------------------------------------
+
+  @Test
+  public void testRDDOuterJoinWithSameKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10);
+
+    Function<Integer, String> func = new IntToStrKeyFunction();
+    JavaPairRDD<Integer, Option<Integer>> rdd2a = 
javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func);
+    JavaPairRDD<Integer, Option<Integer>> rdd2b = 
javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
+
+    HashMap<Integer, Option<Integer>> expectedMap = new HashMap<>();
+    for (int i = -5; i < 10; i ++) {
+      if (i < 0)
+        expectedMap.put(i, Option.apply((Integer) null));
+      else
+        expectedMap.put(i, Some.apply(i));
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d374c9d/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Portfolio.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Portfolio.java
 
b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Portfolio.java
new file mode 100644
index 0000000..63477eb
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Portfolio.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.geode.cache.Declarable;
+
+/**
+ * A stock portfolio that consists of multiple {@link Position} objects that
+ * represent shares of stock (a "security").  Instances of
+ * <code>Portfolio</code> can be stored in a Geode <code>Region</code> and
+ * their contents can be queried using the Geode query service.
+ * </p>
+ * This class is <code>Serializable</code> because we want it to be distributed
+ * to multiple members of a distributed system.  Because this class is
+ * <code>Declarable</code>, we can describe instances of it in a Geode
+ * <code>cache.xml</code> file.
+ * </p>
+ *
+ */
+public class Portfolio implements Declarable, Serializable {
+
+  private static final long serialVersionUID = 9097335119586059309L;
+
+  private int id;  /* id is used as the entry key and is stored in the entry */
+  private String type;
+  private Map<String,Position> positions = new 
LinkedHashMap<String,Position>();
+  private String status;
+
+  public Portfolio(Properties props) {
+    init(props);
+  }
+
+  @Override
+  public void init(Properties props) {
+    this.id = Integer.parseInt(props.getProperty("id"));
+    this.type = props.getProperty("type", "type1");
+    this.status = props.getProperty("status", "active");
+
+    // get the positions. These are stored in the properties object
+    // as Positions, not String, so use Hashtable protocol to get at them.
+    // the keys are named "positionN", where N is an integer.
+    for (Map.Entry<Object, Object> entry: props.entrySet()) {
+      String key = (String)entry.getKey();
+      if (key.startsWith("position")) {
+        Position pos = (Position)entry.getValue();
+        this.positions.put(pos.getSecId(), pos);
+      }
+    }
+  }
+
+  public void setType(String t) {this.type = t; }
+
+  public String getStatus(){
+    return status;
+  }
+
+  public int getId(){
+    return this.id;
+  }
+
+  public Map<String,Position> getPositions(){
+    return this.positions;
+  }
+
+  public String getType() {
+    return this.type;
+  }
+
+  public boolean isActive(){
+    return status.equals("active");
+  }
+
+  @Override
+  public String toString(){
+    StringBuilder buf = new StringBuilder();
+    buf.append("\n\tPortfolio [id=" + this.id + " status=" + this.status);
+    buf.append(" type=" + this.type);
+    boolean firstTime = true;
+    for (Map.Entry<String, Position> entry: positions.entrySet()) {
+      if (!firstTime) {
+        buf.append(", ");
+      }
+      buf.append("\n\t\t");
+      buf.append(entry.getKey() + ":" + entry.getValue());
+      firstTime = false;
+    }
+    buf.append("]");
+    return buf.toString();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d374c9d/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Position.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Position.java
 
b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Position.java
new file mode 100644
index 0000000..7c99ef7
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Position.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector;
+
+import java.io.Serializable;
+import java.util.Properties;
+import org.apache.geode.cache.Declarable;
+
+/**
+ * Represents a number of shares of a stock ("security") held in a {@link
+ * Portfolio}.
+ * </p>
+ * This class is <code>Serializable</code> because we want it to be distributed
+ * to multiple members of a distributed system.  Because this class is
+ * <code>Declarable</code>, we can describe instances of it in a Geode
+ * <code>cache.xml</code> file.
+ * </p>
+ *
+ */
+public class Position implements Declarable, Serializable {
+
+  private static final long serialVersionUID = -8229531542107983344L;
+
+  private String secId;
+  private double qty;
+  private double mktValue;
+
+  public Position(Properties props) {
+    init(props);
+  }
+
+  @Override
+  public void init(Properties props) {
+    this.secId = props.getProperty("secId");
+    this.qty = Double.parseDouble(props.getProperty("qty"));
+    this.mktValue = Double.parseDouble(props.getProperty("mktValue"));
+  }
+
+  public String getSecId(){
+    return this.secId;
+  }
+
+  public double getQty(){
+    return this.qty;
+  }
+
+  public double getMktValue() {
+    return this.mktValue;
+  }
+
+  @Override
+  public String toString(){
+    return new StringBuilder()
+            .append("Position [secId=").append(secId)
+            .append(" qty=").append(this.qty)
+            .append(" mktValue=").append(mktValue).append("]").toString();
+  }
+}
+

Reply via email to