GEODE-120 Add batch size to RDD.saveToGemfire()

Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/70448c5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/70448c5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/70448c5d

Branch: refs/heads/develop
Commit: 70448c5dffeb29ae285720b904cfc04f2ef377ec
Parents: 89b9aaf
Author: Qihong Chen <qc...@pivotal.io>
Authored: Wed Jul 15 15:34:30 2015 -0700
Committer: Qihong Chen <qc...@pivotal.io>
Committed: Fri Jul 17 11:08:01 2015 -0700

----------------------------------------------------------------------
 gemfire-spark-connector/doc/6_save_rdd.md       | 22 +++++-
 .../spark/connector/JavaApiIntegrationTest.java | 72 +++++++++++++++-----
 .../javaapi/GemFireJavaDStreamFunctions.java    | 32 ++++++++-
 .../GemFireJavaPairDStreamFunctions.java        | 26 ++++++-
 .../javaapi/GemFireJavaPairRDDFunctions.java    | 51 +++++++++-----
 .../javaapi/GemFireJavaRDDFunctions.java        | 48 ++++++++++---
 .../connector/javaapi/GemFireJavaUtil.java      |  3 +-
 .../connector/GemFirePairRDDFunctions.scala     |  8 ++-
 .../spark/connector/GemFireRDDFunctions.scala   | 16 +++--
 .../rdd/GemFireRDDPartitionerImpl.scala         |  2 +-
 .../internal/rdd/GemFireRDDWriter.scala         | 45 +++++++-----
 .../spark/connector/javaapi/JavaAPIHelper.scala |  2 +
 .../gemfire/spark/connector/package.scala       |  3 +
 .../streaming/GemFireDStreamFunctions.scala     | 24 +++++--
 .../connector/GemFireRDDFunctionsTest.scala     | 30 +++++++-
 15 files changed, 301 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/doc/6_save_rdd.md
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/doc/6_save_rdd.md 
b/gemfire-spark-connector/doc/6_save_rdd.md
index 1ebc027..004ef62 100644
--- a/gemfire-spark-connector/doc/6_save_rdd.md
+++ b/gemfire-spark-connector/doc/6_save_rdd.md
@@ -57,5 +57,25 @@ rdd2.saveToGemfire("str_int_region", e => (e, e.length))
 // rdd2.saveToGemfire("rgnb", e => (e, e.length), connConf)
 ```
 
- 
+### `rdd.save.batch.size` 
+
+The connector invokes GemFire API `putAll()` to save the data. To make
+`putAll()` more efficient, the connector invokes putAll() for every 
+10,000 entries by default. This batch size can be changed with optional
+parameter `opConf`. The following shows how to do it:
+
+```
+  // in Scala
+  rdd.saveToGemfire(regionPath, opConf = Map(RDDSaveBatchSizePropKey -> 
"5000"))
+
+  // in Java
+  Properties opConf = new Properties();
+  opConf.put(RDDSaveBatchSizePropKey, "5000");
+  ...
+  javaFunctions(rdd).saveToGemfire(regionPath, opConf); 
+   
+  // note: RDDSaveBatchSizePropKey = "rdd.save.batch.size" 
+```
+
+
 Next: [Saving DStream to Geode](7_save_dstream.md)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java
 
b/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java
index 8357d8f..bbbef61 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java
@@ -25,6 +25,7 @@ import scala.Some;
 
 import java.util.*;
 
+import static 
io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.RDDSaveBatchSizePropKey;
 import static 
io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.javaFunctions;
 import static org.junit.Assert.*;
 
@@ -127,25 +128,45 @@ public class JavaApiIntegrationTest extends JUnitSuite {
   }
 
   @Test
+  public void testRDDSaveToGemfireWithDefaultConnConfAndOpConf() throws 
Exception {
+    verifyRDDSaveToGemfire(true, true);
+  }
+
+  @Test
   public void testRDDSaveToGemfireWithDefaultConnConf() throws Exception {
-    verifyRDDSaveToGemfire(true);
+    verifyRDDSaveToGemfire(true, false);
+  }
+  
+  @Test
+  public void testRDDSaveToGemfireWithConnConfAndOpConf() throws Exception {
+    verifyRDDSaveToGemfire(false, true);
   }
 
   @Test
   public void testRDDSaveToGemfireWithConnConf() throws Exception {
-    verifyRDDSaveToGemfire(false);
+    verifyRDDSaveToGemfire(false, false);
   }
-
-  public void verifyRDDSaveToGemfire(boolean useDefaultConnConf) throws 
Exception {
+  
+  public void verifyRDDSaveToGemfire(boolean useDefaultConnConf, boolean 
useOpConf) throws Exception {
     Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0);  
// remove all entries
     JavaRDD<Integer> rdd1 = prepareIntJavaRDD(0, numObjects);
 
     PairFunction<Integer, String, Integer> func = new 
IntToStrIntPairFunction();
-    if (useDefaultConnConf)
-      javaFunctions(rdd1).saveToGemfire(regionPath, func);
-    else
-      javaFunctions(rdd1).saveToGemfire(regionPath, func, connConf);
+    Properties opConf = new Properties();
+    opConf.put(RDDSaveBatchSizePropKey, "200");
 
+    if (useDefaultConnConf) {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGemfire(regionPath, func, opConf);
+      else
+        javaFunctions(rdd1).saveToGemfire(regionPath, func);
+    } else {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGemfire(regionPath, func, connConf, opConf);
+      else
+        javaFunctions(rdd1).saveToGemfire(regionPath, func, connConf);
+    }
+    
     Set<String> keys = region.keySetOnServer();
     Map<String, Integer> map = region.getAll(keys);
 
@@ -162,23 +183,42 @@ public class JavaApiIntegrationTest extends JUnitSuite {
   // 
--------------------------------------------------------------------------------------------
 
   @Test
+  public void testPairRDDSaveToGemfireWithDefaultConnConfAndOpConf() throws 
Exception {
+    verifyPairRDDSaveToGemfire(true, true);
+  }
+
+  @Test
   public void testPairRDDSaveToGemfireWithDefaultConnConf() throws Exception {
-    verifyPairRDDSaveToGemfire(true);
+    verifyPairRDDSaveToGemfire(true, false);
+  }
+  
+  @Test
+  public void testPairRDDSaveToGemfireWithConnConfAndOpConf() throws Exception 
{
+    verifyPairRDDSaveToGemfire(false, true);
   }
 
   @Test
   public void testPairRDDSaveToGemfireWithConnConf() throws Exception {
-    verifyPairRDDSaveToGemfire(false);
+    verifyPairRDDSaveToGemfire(false, false);
   }
-
-  public void verifyPairRDDSaveToGemfire(boolean useDefaultConnConf) throws 
Exception {
+  
+  public void verifyPairRDDSaveToGemfire(boolean useDefaultConnConf, boolean 
useOpConf) throws Exception {
     Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0);  
// remove all entries
     JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(0, 
numObjects);
+    Properties opConf = new Properties();
+    opConf.put(RDDSaveBatchSizePropKey, "200");
 
-    if (useDefaultConnConf)
-      javaFunctions(rdd1).saveToGemfire(regionPath);
-    else
-      javaFunctions(rdd1).saveToGemfire(regionPath, connConf);
+    if (useDefaultConnConf) {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGemfire(regionPath, opConf);
+      else
+        javaFunctions(rdd1).saveToGemfire(regionPath);
+    } else {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGemfire(regionPath, connConf, opConf);
+      else
+        javaFunctions(rdd1).saveToGemfire(regionPath, connConf);
+    }
 
     Set<String> keys = region.keySetOnServer();
     Map<String, Integer> map = region.getAll(keys);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java
index 951d6c9..80b396c 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java
@@ -4,6 +4,9 @@ import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
 import io.pivotal.gemfire.spark.connector.streaming.GemFireDStreamFunctions;
 import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.streaming.api.java.JavaDStream;
+import java.util.Properties;
+
+import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
 
 /**
  * A Java API wrapper over {@link 
org.apache.spark.streaming.api.java.JavaDStream}
@@ -25,10 +28,33 @@ public class GemFireJavaDStreamFunctions<T> {
    * @param regionPath the full path of region that the DStream is stored  
    * @param func the PairFunction that converts elements of JavaDStream to 
key/value pairs
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
+   * @param opConf the optional parameters for this operation
+   */
+  public <K, V> void saveToGemfire(
+    String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf 
connConf, Properties opConf) {
+    dsf.saveToGemfire(regionPath, func, connConf, 
propertiesToScalaMap(opConf));
+  }
+
+  /**
+   * Save the JavaDStream to GemFire key-value store.
+   * @param regionPath the full path of region that the DStream is stored  
+   * @param func the PairFunction that converts elements of JavaDStream to 
key/value pairs
+   * @param opConf the optional  parameters for this operation
+   */
+  public <K, V> void saveToGemfire(
+          String regionPath, PairFunction<T, K, V> func, Properties opConf) {
+    dsf.saveToGemfire(regionPath, func, dsf.defaultConnectionConf(), 
propertiesToScalaMap(opConf));
+  }
+
+  /**
+   * Save the JavaDStream to GemFire key-value store.
+   * @param regionPath the full path of region that the DStream is stored
+   * @param func the PairFunction that converts elements of JavaDStream to 
key/value pairs
+   * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
    */
   public <K, V> void saveToGemfire(
-    String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf 
connConf) {
-    dsf.saveToGemfire(regionPath, func, connConf);
+          String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf 
connConf) {
+    dsf.saveToGemfire(regionPath, func, connConf, emptyStrStrMap());
   }
 
   /**
@@ -38,7 +64,7 @@ public class GemFireJavaDStreamFunctions<T> {
    */
   public <K, V> void saveToGemfire(
           String regionPath, PairFunction<T, K, V> func) {
-    dsf.saveToGemfire(regionPath, func, dsf.defaultConnectionConf());
+    dsf.saveToGemfire(regionPath, func, dsf.defaultConnectionConf(), 
emptyStrStrMap());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java
index 3b43a65..060c3e0 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java
@@ -3,6 +3,9 @@ package io.pivotal.gemfire.spark.connector.javaapi;
 import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
 import 
io.pivotal.gemfire.spark.connector.streaming.GemFirePairDStreamFunctions;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
+import java.util.Properties;
+
+import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
 
 /**
  * A Java API wrapper over {@link 
org.apache.spark.streaming.api.java.JavaPairDStream}
@@ -23,9 +26,28 @@ public class GemFireJavaPairDStreamFunctions<K, V> {
    * Save the JavaPairDStream to GemFire key-value store.
    * @param regionPath the full path of region that the DStream is stored  
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
+   * @param opConf the optional parameters for this operation
    */  
+  public void saveToGemfire(String regionPath, GemFireConnectionConf connConf, 
Properties opConf) {
+    dsf.saveToGemfire(regionPath, connConf, propertiesToScalaMap(opConf));
+  }
+
+  /**
+   * Save the JavaPairDStream to GemFire key-value store.
+   * @param regionPath the full path of region that the DStream is stored  
+   * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
+   */
   public void saveToGemfire(String regionPath, GemFireConnectionConf connConf) 
{
-    dsf.saveToGemfire(regionPath, connConf);
+    dsf.saveToGemfire(regionPath, connConf, emptyStrStrMap());
+  }
+
+  /**
+   * Save the JavaPairDStream to GemFire key-value store.
+   * @param regionPath the full path of region that the DStream is stored
+   * @param opConf the optional parameters for this operation
+   */
+  public void saveToGemfire(String regionPath, Properties opConf) {
+    dsf.saveToGemfire(regionPath, dsf.defaultConnectionConf(), 
propertiesToScalaMap(opConf));
   }
 
   /**
@@ -33,7 +55,7 @@ public class GemFireJavaPairDStreamFunctions<K, V> {
    * @param regionPath the full path of region that the DStream is stored
    */
   public void saveToGemfire(String regionPath) {
-    dsf.saveToGemfire(regionPath, dsf.defaultConnectionConf());
+    dsf.saveToGemfire(regionPath, dsf.defaultConnectionConf(), 
emptyStrStrMap());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java
index 609cdbf..20b5af2 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java
@@ -10,6 +10,8 @@ import scala.Option;
 import scala.Tuple2;
 import scala.reflect.ClassTag;
 
+import java.util.Properties;
+
 import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
 
 /**
@@ -31,9 +33,28 @@ public class GemFireJavaPairRDDFunctions<K, V> {
    * Save the pair RDD to GemFire key-value store.
    * @param regionPath the full path of region that the RDD is stored
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
+   * @param opConf the parameters for this operation
+   */
+  public void saveToGemfire(String regionPath, GemFireConnectionConf connConf, 
Properties opConf) {
+    rddf.saveToGemfire(regionPath, connConf, propertiesToScalaMap(opConf));
+  }
+
+  /**
+   * Save the pair RDD to GemFire key-value store.
+   * @param regionPath the full path of region that the RDD is stored
+   * @param opConf the parameters for this operation
+   */
+  public void saveToGemfire(String regionPath, Properties opConf) {
+    rddf.saveToGemfire(regionPath, rddf.defaultConnectionConf(), 
propertiesToScalaMap(opConf));
+  }
+
+  /**
+   * Save the pair RDD to GemFire key-value store.
+   * @param regionPath the full path of region that the RDD is stored
+   * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
    */
   public void saveToGemfire(String regionPath, GemFireConnectionConf connConf) 
{
-    rddf.saveToGemfire(regionPath, connConf);
+    rddf.saveToGemfire(regionPath, connConf, emptyStrStrMap());
   }
 
   /**
@@ -41,7 +62,7 @@ public class GemFireJavaPairRDDFunctions<K, V> {
    * @param regionPath the full path of region that the RDD is stored
    */
   public void saveToGemfire(String regionPath) {
-    rddf.saveToGemfire(regionPath, rddf.defaultConnectionConf());
+    rddf.saveToGemfire(regionPath, rddf.defaultConnectionConf(), 
emptyStrStrMap());
   }
 
   /**
@@ -51,7 +72,7 @@ public class GemFireJavaPairRDDFunctions<K, V> {
    * (k, v2) is in the GemFire region.
    *
    * @param regionPath the region path of the GemFire region
-   * @tparam V2 the value type of the GemFire region
+   * @param <V2> the value type of the GemFire region
    * @return JavaPairRDD&lt;&lt;K, V>, V2>
    */  
   public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(String 
regionPath) {
@@ -66,7 +87,7 @@ public class GemFireJavaPairRDDFunctions<K, V> {
    *
    * @param regionPath the region path of the GemFire region
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
-   * @tparam V2 the value type of the GemFire region
+   * @param <V2> the value type of the GemFire region
    * @return JavaPairRDD&lt;&lt;K, V>, V2>
    */
   public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(
@@ -88,8 +109,8 @@ public class GemFireJavaPairRDDFunctions<K, V> {
    *
    * @param regionPath the region path of the GemFire region
    * @param func the function that generates region key from RDD element (K, V)
-   * @tparam K2 the key type of the GemFire region
-   * @tparam V2 the value type of the GemFire region
+   * @param <K2> the key type of the GemFire region
+   * @param <V2> the value type of the GemFire region
    * @return JavaPairRDD&lt;Tuple2&lt;K, V>, V2>
    */
   public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(
@@ -109,8 +130,8 @@ public class GemFireJavaPairRDDFunctions<K, V> {
    * @param regionPath the region path of the GemFire region
    * @param func the function that generates region key from RDD element (K, V)
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
-   * @tparam K2 the key type of the GemFire region
-   * @tparam V2 the value type of the GemFire region
+   * @param <K2> the key type of the GemFire region
+   * @param <V2> the value type of the GemFire region
    * @return JavaPairRDD&lt;Tuple2&lt;K, V>, V2>
    */  
   public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(
@@ -128,8 +149,7 @@ public class GemFireJavaPairRDDFunctions<K, V> {
    * ((k, v), None)) if no element in the GemFire region have key k.
    *
    * @param regionPath the region path of the GemFire region
-   * @tparam K2 the key type of the GemFire region
-   * @tparam V2 the value type of the GemFire region
+   * @param <V2> the value type of the GemFire region
    * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
    */
   public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> 
outerJoinGemfireRegion(String regionPath) {
@@ -144,8 +164,7 @@ public class GemFireJavaPairRDDFunctions<K, V> {
    *
    * @param regionPath the region path of the GemFire region
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
-   * @tparam K2 the key type of the GemFire region
-   * @tparam V2 the value type of the GemFire region
+   * @param <V2> the value type of the GemFire region
    * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
    */  
   public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(
@@ -167,8 +186,8 @@ public class GemFireJavaPairRDDFunctions<K, V> {
    *
    * @param regionPath the region path of the GemFire region
    * @param func the function that generates region key from RDD element (K, V)
-   * @tparam K2 the key type of the GemFire region
-   * @tparam V2 the value type of the GemFire region
+   * @param <K2> the key type of the GemFire region
+   * @param <V2> the value type of the GemFire region
    * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
    */
   public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(
@@ -188,8 +207,8 @@ public class GemFireJavaPairRDDFunctions<K, V> {
    * @param regionPath the region path of the GemFire region
    * @param func the function that generates region key from RDD element (K, V)
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
-   * @tparam K2 the key type of the GemFire region
-   * @tparam V2 the value type of the GemFire region
+   * @param <K2> the key type of the GemFire region
+   * @param <V2> the value type of the GemFire region
    * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
    */
   public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java
index ccdabb7..9bf35c1 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java
@@ -11,6 +11,8 @@ import org.apache.spark.api.java.function.PairFunction;
 import scala.Option;
 import scala.reflect.ClassTag;
 
+import java.util.Properties;
+
 import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
 
 /**
@@ -33,9 +35,33 @@ public class GemFireJavaRDDFunctions<T> {
    * @param regionPath the full path of region that the RDD is stored  
    * @param func the PairFunction that converts elements of JavaRDD to 
key/value pairs
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
+   * @param opConf the parameters for this operation
    */  
-  public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> 
func, GemFireConnectionConf connConf) {
-    rddf.saveToGemfire(regionPath, func, connConf);
+  public <K, V> void saveToGemfire(
+    String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf 
connConf, Properties opConf) {
+    rddf.saveToGemfire(regionPath, func, connConf, 
propertiesToScalaMap(opConf));
+  }
+
+  /**
+   * Save the non-pair RDD to GemFire key-value store.
+   * @param regionPath the full path of region that the RDD is stored  
+   * @param func the PairFunction that converts elements of JavaRDD to 
key/value pairs
+   * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
+   */
+  public <K, V> void saveToGemfire(
+    String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf 
connConf) {
+    rddf.saveToGemfire(regionPath, func, connConf, emptyStrStrMap());
+  }
+
+  /**
+   * Save the non-pair RDD to GemFire key-value store.
+   * @param regionPath the full path of region that the RDD is stored
+   * @param func the PairFunction that converts elements of JavaRDD to 
key/value pairs
+   * @param opConf the parameters for this operation
+   */
+  public <K, V> void saveToGemfire(
+    String regionPath, PairFunction<T, K, V> func, Properties opConf) {
+    rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf(), 
propertiesToScalaMap(opConf));
   }
 
   /**
@@ -44,7 +70,7 @@ public class GemFireJavaRDDFunctions<T> {
    * @param func the PairFunction that converts elements of JavaRDD to 
key/value pairs
    */
   public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> 
func) {
-    rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf());
+    rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf(), 
emptyStrStrMap());
   }
 
   /**
@@ -58,8 +84,8 @@ public class GemFireJavaRDDFunctions<T> {
    *
    * @param regionPath the region path of the GemFire region
    * @param func the function that generates region key from RDD element T
-   * @tparam K the key type of the GemFire region
-   * @tparam V the value type of the GemFire region
+   * @param <K> the key type of the GemFire region
+   * @param <V> the value type of the GemFire region
    * @return JavaPairRDD&lt;T, V>
    */
   public <K, V> JavaPairRDD<T, V> joinGemfireRegion(String regionPath, 
Function<T, K> func) {
@@ -78,8 +104,8 @@ public class GemFireJavaRDDFunctions<T> {
    * @param regionPath the region path of the GemFire region
    * @param func the function that generates region key from RDD element T
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
-   * @tparam K the key type of the GemFire region
-   * @tparam V the value type of the GemFire region
+   * @param <K> the key type of the GemFire region
+   * @param <V> the value type of the GemFire region
    * @return JavaPairRDD&lt;T, V>
    */
   public <K, V> JavaPairRDD<T, V> joinGemfireRegion(
@@ -101,8 +127,8 @@ public class GemFireJavaRDDFunctions<T> {
    *
    * @param regionPath the region path of the GemFire region
    * @param func the function that generates region key from RDD element T
-   * @tparam K the key type of the GemFire region
-   * @tparam V the value type of the GemFire region
+   * @param <K> the key type of the GemFire region
+   * @param <V> the value type of the GemFire region
    * @return JavaPairRDD&lt;T, Option&lt;V>>
    */
   public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion(String 
regionPath, Function<T, K> func) {
@@ -121,8 +147,8 @@ public class GemFireJavaRDDFunctions<T> {
    * @param regionPath the region path of the GemFire region
    * @param func the function that generates region key from RDD element T
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
-   * @tparam K the key type of the GemFire region
-   * @tparam V the value type of the GemFire region
+   * @param <K> the key type of the GemFire region
+   * @param <V> the value type of the GemFire region
    * @return JavaPairRDD&lt;T, Option&lt;V>>
    */
   public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion(

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java
index ff11588..5e0c928 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java
@@ -27,7 +27,8 @@ public final class GemFireJavaUtil {
   public static String NumberPartitionsPerServerPropKey = 
package$.MODULE$.NumberPartitionsPerServerPropKey();
   public static String OnePartitionPartitionerName = 
package$.MODULE$.OnePartitionPartitionerName();
   public static String ServerSplitsPartitionerName = 
package$.MODULE$.ServerSplitsPartitionerName();
-
+  public static String RDDSaveBatchSizePropKey = 
package$.MODULE$.RDDSaveBatchSizePropKey();
+  public static int RDDSaveBatchSizeDefault = 
package$.MODULE$.RDDSaveBatchSizeDefault();
   
   /** The private constructor is used prevents user from creating instance of 
this class. */
   private GemFireJavaUtil() { }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
index 9fb1c04..86ec596 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
@@ -16,11 +16,15 @@ class GemFirePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) 
extends Serializable w
    * Save the RDD of pairs to GemFire key-value store without any conversion
    * @param regionPath the full path of region that the RDD is stored
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
+   * @param opConf the optional parameters for this operation
    */
-  def saveToGemfire(regionPath: String, connConf: GemFireConnectionConf = 
defaultConnectionConf): Unit = {
+  def saveToGemfire(
+      regionPath: String, 
+      connConf: GemFireConnectionConf = defaultConnectionConf, 
+      opConf: Map[String, String] = Map.empty): Unit = {    
     connConf.getConnection.validateRegion[K, V](regionPath)
     logInfo(s"Save RDD id=${rdd.id} to region $regionPath")
-    val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf)
+    val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf, opConf)
     rdd.sparkContext.runJob(rdd, writer.write _)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
index 4ffacc5..3aa1ebd 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
@@ -17,18 +17,26 @@ class GemFireRDDFunctions[T](val rdd: RDD[T]) extends 
Serializable with Logging
    * @param regionPath the full path of region that the RDD is stored  
    * @param func the function that converts elements of RDD to key/value pairs
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
+   * @param opConf the optional parameters for this operation
    */
-  def saveToGemfire[K, V](regionPath: String, func: T => (K, V), connConf: 
GemFireConnectionConf = defaultConnectionConf): Unit = {
+  def saveToGemfire[K, V](
+      regionPath: String, 
+      func: T => (K, V), 
+      connConf: GemFireConnectionConf = defaultConnectionConf,
+      opConf: Map[String, String] = Map.empty): Unit = {
     connConf.getConnection.validateRegion[K, V](regionPath)
     logInfo(s"Save RDD id=${rdd.id} to region $regionPath")
-    val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf)
+    val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf, opConf)
     rdd.sparkContext.runJob(rdd, writer.write(func) _)
   }
 
   /** This version of saveToGemfire(...) is just for Java API. */
   private[connector] def saveToGemfire[K, V](
-    regionPath: String, func: PairFunction[T, K, V], connConf: 
GemFireConnectionConf): Unit = {
-    saveToGemfire[K, V](regionPath, func.call _, connConf)
+      regionPath: String, 
+      func: PairFunction[T, K, V], 
+      connConf: GemFireConnectionConf, 
+      opConf: Map[String, String]): Unit = {
+    saveToGemfire[K, V](regionPath, func.call _, connConf, opConf)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala
index 0c9c34f..bc1a791 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala
@@ -30,7 +30,7 @@ object ServerSplitsPartitioner extends GemFireRDDPartitioner {
   override def partitions[K: ClassTag, V: ClassTag]
   (conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): 
Array[Partition] = {
     if (md == null) throw new RuntimeException("RegionMetadata is null")
-    val n = env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt
+    val n = try { env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt } 
catch { case e: NumberFormatException => 2 }
     if (!md.isPartitioned || md.getServerBucketMap == null || 
md.getServerBucketMap.isEmpty)
       Array[Partition](new GemFireRDDPartition(0, Set.empty))
     else {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala
index 573902b..11e1e07 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala
@@ -1,15 +1,18 @@
 package io.pivotal.gemfire.spark.connector.internal.rdd
 
 import com.gemstone.gemfire.cache.Region
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf
+import io.pivotal.gemfire.spark.connector._
 import org.apache.spark.{Logging, TaskContext}
 
 import scala.collection.Iterator
-import collection.JavaConversions._
+import java.util.{HashMap => JMap}
 
 /** This trait provide some common code for pair and non-pair RDD writer */
-private[rdd] trait GemFireRDDWriterTraceUtils {
-  
+private[rdd] abstract class GemFireRDDWriterBase (opConf: Map[String, String]) 
extends Serializable {
+
+  val batchSize = try { opConf.getOrElse(RDDSaveBatchSizePropKey, 
RDDSaveBatchSizeDefault.toString).toInt}
+                  catch { case e: NumberFormatException => 
RDDSaveBatchSizeDefault }
+
   def mapDump(map: Map[_, _], num: Int): String = {
     val firstNum = map.take(num + 1)
     if (firstNum.size > num) s"$firstNum ..." else s"$firstNum"    
@@ -21,16 +24,20 @@ private[rdd] trait GemFireRDDWriterTraceUtils {
  * Those functions will be executed on Spark executors.
  * @param regionPath the full path of the region where the data is written to
  */
-class GemFireRDDWriter[T, K, V]
-(regionPath: String, connConf: GemFireConnectionConf) extends Serializable 
with GemFireRDDWriterTraceUtils with Logging {
+class GemFireRDDWriter[T, K, V] 
+  (regionPath: String, connConf: GemFireConnectionConf, opConf: Map[String, 
String] = Map.empty)
+  extends GemFireRDDWriterBase(opConf) with Serializable with Logging {
 
   def write(func: T => (K, V))(taskContext: TaskContext, data: Iterator[T]): 
Unit = {
     val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, 
V](regionPath)
-    // todo. optimize batch size of putAll
-    val map: Map[K, V] = data.map(func).toMap
-    region.putAll(map)
-    logDebug(s"${map.size} entries are saved to region $regionPath")
-    logTrace(mapDump(map, 10))
+    var count = 0
+    val chunks = data.grouped(batchSize)
+    chunks.foreach { chunk =>
+      val map = chunk.foldLeft(new JMap[K, V]()){case (m, t) => val (k, v) = 
func(t); m.put(k, v); m}
+      region.putAll(map)
+      count += chunk.length
+    }
+    logDebug(s"$count entries (batch.size = $batchSize) are saved to region 
$regionPath")
   }
 }
 
@@ -41,15 +48,19 @@ class GemFireRDDWriter[T, K, V]
  * @param regionPath the full path of the region where the data is written to
  */
 class GemFirePairRDDWriter[K, V]
-(regionPath: String, connConf: GemFireConnectionConf) extends Serializable 
with GemFireRDDWriterTraceUtils with Logging {
+  (regionPath: String, connConf: GemFireConnectionConf, opConf: Map[String, 
String] = Map.empty)
+  extends GemFireRDDWriterBase(opConf) with Serializable with Logging {
 
   def write(taskContext: TaskContext, data: Iterator[(K, V)]): Unit = {
     val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, 
V](regionPath)
-    // todo. optimize batch size of putAll
-    val map: Map[K, V] = data.toMap
-    region.putAll(map)
-    logDebug(s"${map.size} entries are saved to region $regionPath")
-    logTrace(mapDump(map, 10))
+    var count = 0
+    val chunks = data.grouped(batchSize)
+    chunks.foreach { chunk =>
+      val map = chunk.foldLeft(new JMap[K, V]()){case (m, (k,v)) => 
m.put(k,v); m}
+      region.putAll(map)
+      count += chunk.length
+    }
+    logDebug(s"$count entries (batch.batch = $batchSize) are saved to region 
$regionPath")
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala
index cf7b250..11de6f1 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala
@@ -32,4 +32,6 @@ private[connector] object JavaAPIHelper {
   def toJavaPairDStream[K, V](ds: JavaDStream[(K, V)]): JavaPairDStream[K, V] =
     JavaPairDStream.fromJavaDStream(ds)
 
+  /** an empty Map[String, String] for default opConf **/
+  val emptyStrStrMap: Map[String, String] = Map.empty
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
index 834d6a5..72a5bb1 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
@@ -22,6 +22,9 @@ package object connector {
   final val OnePartitionPartitionerName = OnePartitionPartitioner.name
   final val ServerSplitsPartitionerName = ServerSplitsPartitioner.name
 
+  final val RDDSaveBatchSizePropKey = "rdd.save.batch.size"
+  final val RDDSaveBatchSizeDefault = 10000
+  
   implicit def toSparkContextFunctions(sc: SparkContext): 
GemFireSparkContextFunctions =
     new GemFireSparkContextFunctions(sc)
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala
index 91eb784..f064498 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala
@@ -18,18 +18,26 @@ class GemFireDStreamFunctions[T](val dstream: DStream[T]) 
extends Serializable w
    * @param regionPath the full path of region that the DStream is stored
    * @param func the function that converts elements of the DStream to 
key/value pairs
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
+   * @param opConf the optional parameters for this operation
    */
   def saveToGemfire[K, V](
-    regionPath: String, func: T => (K, V), connConf: GemFireConnectionConf = 
defaultConnectionConf): Unit = {
+      regionPath: String, 
+      func: T => (K, V), 
+      connConf: GemFireConnectionConf = defaultConnectionConf, 
+      opConf: Map[String, String] = Map.empty): Unit = {
     connConf.getConnection.validateRegion[K, V](regionPath)
-    val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf)
+    val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf, opConf)
     logInfo(s"""Save DStream region=$regionPath 
conn=${connConf.locators.mkString(",")}""")
     dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write(func) 
_))
   }
 
   /** this version of saveToGemfire is just for Java API */
-  def saveToGemfire[K, V](regionPath: String, func: PairFunction[T, K, V], 
connConf: GemFireConnectionConf): Unit = {
-    saveToGemfire[K, V](regionPath, func.call _, connConf)
+  def saveToGemfire[K, V](
+      regionPath: String,
+      func: PairFunction[T, K, V],
+      connConf: GemFireConnectionConf,
+      opConf: Map[String, String] ): Unit = {
+    saveToGemfire[K, V](regionPath, func.call _, connConf, opConf)
   }
 
   private[connector] def defaultConnectionConf: GemFireConnectionConf =
@@ -48,10 +56,14 @@ class GemFirePairDStreamFunctions[K, V](val dstream: 
DStream[(K,V)]) extends Ser
    * Save the DStream of pairs to GemFire key-value store without any 
conversion
    * @param regionPath the full path of region that the DStream is stored
    * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
+   * @param opConf the optional parameters for this operation
    */
-  def saveToGemfire(regionPath: String, connConf: GemFireConnectionConf = 
defaultConnectionConf): Unit = {
+  def saveToGemfire(
+      regionPath: String, 
+      connConf: GemFireConnectionConf = defaultConnectionConf, 
+      opConf: Map[String, String] = Map.empty): Unit = {
     connConf.getConnection.validateRegion[K, V](regionPath)
-    val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf)
+    val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf, opConf)
     logInfo(s"""Save DStream region=$regionPath 
conn=${connConf.locators.mkString(",")}""")
     dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _))
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala
index 659fca2..fdf5ff1 100644
--- 
a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala
@@ -61,30 +61,54 @@ class GemFireRDDFunctionsTest extends FunSuite with 
Matchers with MockitoSugar {
   }
   
   test("test PairRDDFunctions.saveToGemfire") {
+    verifyPairRDDFunction(useOpConf = false)
+  }
+
+  test("test PairRDDFunctions.saveToGemfire w/ opConf") {
+    verifyPairRDDFunction(useOpConf = true)
+  }
+  
+  def verifyPairRDDFunction(useOpConf: Boolean): Unit = {
     import io.pivotal.gemfire.spark.connector._
     val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, String]("test")
     val mockRDD = mock[RDD[(String, String)]]
     val mockSparkContext = mock[SparkContext]
     when(mockRDD.sparkContext).thenReturn(mockSparkContext)
-    val result = mockRDD.saveToGemfire(regionPath, mockConnConf)
+    val result = 
+      if (useOpConf) 
+        mockRDD.saveToGemfire(regionPath, mockConnConf, 
Map(RDDSaveBatchSizePropKey -> "5000"))
+      else
+        mockRDD.saveToGemfire(regionPath, mockConnConf)
     verify(mockConnection, times(1)).validateRegion[String, String](regionPath)
     result === Unit
     verify(mockSparkContext, times(1)).runJob[(String, String), Unit](
       mockEq(mockRDD), mockAny[(TaskContext, Iterator[(String, String)]) => 
Unit])(mockAny(classOf[ClassTag[Unit]]))
 
     // Note: current implementation make following code not compilable
-    //       so not negative test for this case   
+    //       so not negative test for this case
     //  val rdd: RDD[(K, V)] = ...
     //  rdd.saveToGemfire(regionPath, s => (s.length, s))
   }
 
   test("test RDDFunctions.saveToGemfire") {
+    verifyRDDFunction(useOpConf = false)
+  }
+
+  test("test RDDFunctions.saveToGemfire w/ opConf") {
+    verifyRDDFunction(useOpConf = true)
+  }
+  
+  def verifyRDDFunction(useOpConf: Boolean): Unit = {
     import io.pivotal.gemfire.spark.connector._
     val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[Int, String]("test")
     val mockRDD = mock[RDD[(String)]]
     val mockSparkContext = mock[SparkContext]
     when(mockRDD.sparkContext).thenReturn(mockSparkContext)
-    val result = mockRDD.saveToGemfire(regionPath, s => (s.length, s), 
mockConnConf)
+    val result = 
+      if (useOpConf)
+        mockRDD.saveToGemfire(regionPath, s => (s.length, s), mockConnConf, 
Map(RDDSaveBatchSizePropKey -> "5000"))
+      else
+        mockRDD.saveToGemfire(regionPath, s => (s.length, s), mockConnConf)
     verify(mockConnection, times(1)).validateRegion[Int, String](regionPath)
     result === Unit
     verify(mockSparkContext, times(1)).runJob[String, Unit](


Reply via email to