GEODE-120 Add batch size to RDD.saveToGemfire()
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/70448c5d Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/70448c5d Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/70448c5d Branch: refs/heads/develop Commit: 70448c5dffeb29ae285720b904cfc04f2ef377ec Parents: 89b9aaf Author: Qihong Chen <qc...@pivotal.io> Authored: Wed Jul 15 15:34:30 2015 -0700 Committer: Qihong Chen <qc...@pivotal.io> Committed: Fri Jul 17 11:08:01 2015 -0700 ---------------------------------------------------------------------- gemfire-spark-connector/doc/6_save_rdd.md | 22 +++++- .../spark/connector/JavaApiIntegrationTest.java | 72 +++++++++++++++----- .../javaapi/GemFireJavaDStreamFunctions.java | 32 ++++++++- .../GemFireJavaPairDStreamFunctions.java | 26 ++++++- .../javaapi/GemFireJavaPairRDDFunctions.java | 51 +++++++++----- .../javaapi/GemFireJavaRDDFunctions.java | 48 ++++++++++--- .../connector/javaapi/GemFireJavaUtil.java | 3 +- .../connector/GemFirePairRDDFunctions.scala | 8 ++- .../spark/connector/GemFireRDDFunctions.scala | 16 +++-- .../rdd/GemFireRDDPartitionerImpl.scala | 2 +- .../internal/rdd/GemFireRDDWriter.scala | 45 +++++++----- .../spark/connector/javaapi/JavaAPIHelper.scala | 2 + .../gemfire/spark/connector/package.scala | 3 + .../streaming/GemFireDStreamFunctions.scala | 24 +++++-- .../connector/GemFireRDDFunctionsTest.scala | 30 +++++++- 15 files changed, 301 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/doc/6_save_rdd.md ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/doc/6_save_rdd.md b/gemfire-spark-connector/doc/6_save_rdd.md index 1ebc027..004ef62 100644 --- a/gemfire-spark-connector/doc/6_save_rdd.md +++ b/gemfire-spark-connector/doc/6_save_rdd.md @@ -57,5 +57,25 @@ rdd2.saveToGemfire("str_int_region", e => (e, e.length)) // rdd2.saveToGemfire("rgnb", e => (e, e.length), connConf) ``` - +### `rdd.save.batch.size` + +The connector invokes GemFire API `putAll()` to save the data. To make +`putAll()` more efficient, the connector invokes putAll() for every +10,000 entries by default. This batch size can be changed with optional +parameter `opConf`. The following shows how to do it: + +``` + // in Scala + rdd.saveToGemfire(regionPath, opConf = Map(RDDSaveBatchSizePropKey -> "5000")) + + // in Java + Properties opConf = new Properties(); + opConf.put(RDDSaveBatchSizePropKey, "5000"); + ... + javaFunctions(rdd).saveToGemfire(regionPath, opConf); + + // note: RDDSaveBatchSizePropKey = "rdd.save.batch.size" +``` + + Next: [Saving DStream to Geode](7_save_dstream.md) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java b/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java index 8357d8f..bbbef61 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java +++ b/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java @@ -25,6 +25,7 @@ import scala.Some; import java.util.*; +import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.RDDSaveBatchSizePropKey; import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.javaFunctions; import static org.junit.Assert.*; @@ -127,25 +128,45 @@ public class JavaApiIntegrationTest extends JUnitSuite { } @Test + public void testRDDSaveToGemfireWithDefaultConnConfAndOpConf() throws Exception { + verifyRDDSaveToGemfire(true, true); + } + + @Test public void testRDDSaveToGemfireWithDefaultConnConf() throws Exception { - verifyRDDSaveToGemfire(true); + verifyRDDSaveToGemfire(true, false); + } + + @Test + public void testRDDSaveToGemfireWithConnConfAndOpConf() throws Exception { + verifyRDDSaveToGemfire(false, true); } @Test public void testRDDSaveToGemfireWithConnConf() throws Exception { - verifyRDDSaveToGemfire(false); + verifyRDDSaveToGemfire(false, false); } - - public void verifyRDDSaveToGemfire(boolean useDefaultConnConf) throws Exception { + + public void verifyRDDSaveToGemfire(boolean useDefaultConnConf, boolean useOpConf) throws Exception { Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0); // remove all entries JavaRDD<Integer> rdd1 = prepareIntJavaRDD(0, numObjects); PairFunction<Integer, String, Integer> func = new IntToStrIntPairFunction(); - if (useDefaultConnConf) - javaFunctions(rdd1).saveToGemfire(regionPath, func); - else - javaFunctions(rdd1).saveToGemfire(regionPath, func, connConf); + Properties opConf = new Properties(); + opConf.put(RDDSaveBatchSizePropKey, "200"); + if (useDefaultConnConf) { + if (useOpConf) + javaFunctions(rdd1).saveToGemfire(regionPath, func, opConf); + else + javaFunctions(rdd1).saveToGemfire(regionPath, func); + } else { + if (useOpConf) + javaFunctions(rdd1).saveToGemfire(regionPath, func, connConf, opConf); + else + javaFunctions(rdd1).saveToGemfire(regionPath, func, connConf); + } + Set<String> keys = region.keySetOnServer(); Map<String, Integer> map = region.getAll(keys); @@ -162,23 +183,42 @@ public class JavaApiIntegrationTest extends JUnitSuite { // -------------------------------------------------------------------------------------------- @Test + public void testPairRDDSaveToGemfireWithDefaultConnConfAndOpConf() throws Exception { + verifyPairRDDSaveToGemfire(true, true); + } + + @Test public void testPairRDDSaveToGemfireWithDefaultConnConf() throws Exception { - verifyPairRDDSaveToGemfire(true); + verifyPairRDDSaveToGemfire(true, false); + } + + @Test + public void testPairRDDSaveToGemfireWithConnConfAndOpConf() throws Exception { + verifyPairRDDSaveToGemfire(false, true); } @Test public void testPairRDDSaveToGemfireWithConnConf() throws Exception { - verifyPairRDDSaveToGemfire(false); + verifyPairRDDSaveToGemfire(false, false); } - - public void verifyPairRDDSaveToGemfire(boolean useDefaultConnConf) throws Exception { + + public void verifyPairRDDSaveToGemfire(boolean useDefaultConnConf, boolean useOpConf) throws Exception { Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0); // remove all entries JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(0, numObjects); + Properties opConf = new Properties(); + opConf.put(RDDSaveBatchSizePropKey, "200"); - if (useDefaultConnConf) - javaFunctions(rdd1).saveToGemfire(regionPath); - else - javaFunctions(rdd1).saveToGemfire(regionPath, connConf); + if (useDefaultConnConf) { + if (useOpConf) + javaFunctions(rdd1).saveToGemfire(regionPath, opConf); + else + javaFunctions(rdd1).saveToGemfire(regionPath); + } else { + if (useOpConf) + javaFunctions(rdd1).saveToGemfire(regionPath, connConf, opConf); + else + javaFunctions(rdd1).saveToGemfire(regionPath, connConf); + } Set<String> keys = region.keySetOnServer(); Map<String, Integer> map = region.getAll(keys); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java index 951d6c9..80b396c 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java @@ -4,6 +4,9 @@ import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; import io.pivotal.gemfire.spark.connector.streaming.GemFireDStreamFunctions; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.api.java.JavaDStream; +import java.util.Properties; + +import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; /** * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaDStream} @@ -25,10 +28,33 @@ public class GemFireJavaDStreamFunctions<T> { * @param regionPath the full path of region that the DStream is stored * @param func the PairFunction that converts elements of JavaDStream to key/value pairs * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @param opConf the optional parameters for this operation + */ + public <K, V> void saveToGemfire( + String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf, Properties opConf) { + dsf.saveToGemfire(regionPath, func, connConf, propertiesToScalaMap(opConf)); + } + + /** + * Save the JavaDStream to GemFire key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param func the PairFunction that converts elements of JavaDStream to key/value pairs + * @param opConf the optional parameters for this operation + */ + public <K, V> void saveToGemfire( + String regionPath, PairFunction<T, K, V> func, Properties opConf) { + dsf.saveToGemfire(regionPath, func, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf)); + } + + /** + * Save the JavaDStream to GemFire key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param func the PairFunction that converts elements of JavaDStream to key/value pairs + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster */ public <K, V> void saveToGemfire( - String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) { - dsf.saveToGemfire(regionPath, func, connConf); + String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) { + dsf.saveToGemfire(regionPath, func, connConf, emptyStrStrMap()); } /** @@ -38,7 +64,7 @@ public class GemFireJavaDStreamFunctions<T> { */ public <K, V> void saveToGemfire( String regionPath, PairFunction<T, K, V> func) { - dsf.saveToGemfire(regionPath, func, dsf.defaultConnectionConf()); + dsf.saveToGemfire(regionPath, func, dsf.defaultConnectionConf(), emptyStrStrMap()); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java index 3b43a65..060c3e0 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java @@ -3,6 +3,9 @@ package io.pivotal.gemfire.spark.connector.javaapi; import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; import io.pivotal.gemfire.spark.connector.streaming.GemFirePairDStreamFunctions; import org.apache.spark.streaming.api.java.JavaPairDStream; +import java.util.Properties; + +import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; /** * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaPairDStream} @@ -23,9 +26,28 @@ public class GemFireJavaPairDStreamFunctions<K, V> { * Save the JavaPairDStream to GemFire key-value store. * @param regionPath the full path of region that the DStream is stored * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @param opConf the optional parameters for this operation */ + public void saveToGemfire(String regionPath, GemFireConnectionConf connConf, Properties opConf) { + dsf.saveToGemfire(regionPath, connConf, propertiesToScalaMap(opConf)); + } + + /** + * Save the JavaPairDStream to GemFire key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + */ public void saveToGemfire(String regionPath, GemFireConnectionConf connConf) { - dsf.saveToGemfire(regionPath, connConf); + dsf.saveToGemfire(regionPath, connConf, emptyStrStrMap()); + } + + /** + * Save the JavaPairDStream to GemFire key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param opConf the optional parameters for this operation + */ + public void saveToGemfire(String regionPath, Properties opConf) { + dsf.saveToGemfire(regionPath, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf)); } /** @@ -33,7 +55,7 @@ public class GemFireJavaPairDStreamFunctions<K, V> { * @param regionPath the full path of region that the DStream is stored */ public void saveToGemfire(String regionPath) { - dsf.saveToGemfire(regionPath, dsf.defaultConnectionConf()); + dsf.saveToGemfire(regionPath, dsf.defaultConnectionConf(), emptyStrStrMap()); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java index 609cdbf..20b5af2 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java @@ -10,6 +10,8 @@ import scala.Option; import scala.Tuple2; import scala.reflect.ClassTag; +import java.util.Properties; + import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; /** @@ -31,9 +33,28 @@ public class GemFireJavaPairRDDFunctions<K, V> { * Save the pair RDD to GemFire key-value store. * @param regionPath the full path of region that the RDD is stored * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @param opConf the parameters for this operation + */ + public void saveToGemfire(String regionPath, GemFireConnectionConf connConf, Properties opConf) { + rddf.saveToGemfire(regionPath, connConf, propertiesToScalaMap(opConf)); + } + + /** + * Save the pair RDD to GemFire key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param opConf the parameters for this operation + */ + public void saveToGemfire(String regionPath, Properties opConf) { + rddf.saveToGemfire(regionPath, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf)); + } + + /** + * Save the pair RDD to GemFire key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster */ public void saveToGemfire(String regionPath, GemFireConnectionConf connConf) { - rddf.saveToGemfire(regionPath, connConf); + rddf.saveToGemfire(regionPath, connConf, emptyStrStrMap()); } /** @@ -41,7 +62,7 @@ public class GemFireJavaPairRDDFunctions<K, V> { * @param regionPath the full path of region that the RDD is stored */ public void saveToGemfire(String regionPath) { - rddf.saveToGemfire(regionPath, rddf.defaultConnectionConf()); + rddf.saveToGemfire(regionPath, rddf.defaultConnectionConf(), emptyStrStrMap()); } /** @@ -51,7 +72,7 @@ public class GemFireJavaPairRDDFunctions<K, V> { * (k, v2) is in the GemFire region. * * @param regionPath the region path of the GemFire region - * @tparam V2 the value type of the GemFire region + * @param <V2> the value type of the GemFire region * @return JavaPairRDD<<K, V>, V2> */ public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(String regionPath) { @@ -66,7 +87,7 @@ public class GemFireJavaPairRDDFunctions<K, V> { * * @param regionPath the region path of the GemFire region * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @tparam V2 the value type of the GemFire region + * @param <V2> the value type of the GemFire region * @return JavaPairRDD<<K, V>, V2> */ public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion( @@ -88,8 +109,8 @@ public class GemFireJavaPairRDDFunctions<K, V> { * * @param regionPath the region path of the GemFire region * @param func the function that generates region key from RDD element (K, V) - * @tparam K2 the key type of the GemFire region - * @tparam V2 the value type of the GemFire region + * @param <K2> the key type of the GemFire region + * @param <V2> the value type of the GemFire region * @return JavaPairRDD<Tuple2<K, V>, V2> */ public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion( @@ -109,8 +130,8 @@ public class GemFireJavaPairRDDFunctions<K, V> { * @param regionPath the region path of the GemFire region * @param func the function that generates region key from RDD element (K, V) * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @tparam K2 the key type of the GemFire region - * @tparam V2 the value type of the GemFire region + * @param <K2> the key type of the GemFire region + * @param <V2> the value type of the GemFire region * @return JavaPairRDD<Tuple2<K, V>, V2> */ public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion( @@ -128,8 +149,7 @@ public class GemFireJavaPairRDDFunctions<K, V> { * ((k, v), None)) if no element in the GemFire region have key k. * * @param regionPath the region path of the GemFire region - * @tparam K2 the key type of the GemFire region - * @tparam V2 the value type of the GemFire region + * @param <V2> the value type of the GemFire region * @return JavaPairRDD<Tuple2<K, V>, Option<V>> */ public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(String regionPath) { @@ -144,8 +164,7 @@ public class GemFireJavaPairRDDFunctions<K, V> { * * @param regionPath the region path of the GemFire region * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @tparam K2 the key type of the GemFire region - * @tparam V2 the value type of the GemFire region + * @param <V2> the value type of the GemFire region * @return JavaPairRDD<Tuple2<K, V>, Option<V>> */ public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion( @@ -167,8 +186,8 @@ public class GemFireJavaPairRDDFunctions<K, V> { * * @param regionPath the region path of the GemFire region * @param func the function that generates region key from RDD element (K, V) - * @tparam K2 the key type of the GemFire region - * @tparam V2 the value type of the GemFire region + * @param <K2> the key type of the GemFire region + * @param <V2> the value type of the GemFire region * @return JavaPairRDD<Tuple2<K, V>, Option<V>> */ public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion( @@ -188,8 +207,8 @@ public class GemFireJavaPairRDDFunctions<K, V> { * @param regionPath the region path of the GemFire region * @param func the function that generates region key from RDD element (K, V) * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @tparam K2 the key type of the GemFire region - * @tparam V2 the value type of the GemFire region + * @param <K2> the key type of the GemFire region + * @param <V2> the value type of the GemFire region * @return JavaPairRDD<Tuple2<K, V>, Option<V>> */ public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion( http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java index ccdabb7..9bf35c1 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java @@ -11,6 +11,8 @@ import org.apache.spark.api.java.function.PairFunction; import scala.Option; import scala.reflect.ClassTag; +import java.util.Properties; + import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; /** @@ -33,9 +35,33 @@ public class GemFireJavaRDDFunctions<T> { * @param regionPath the full path of region that the RDD is stored * @param func the PairFunction that converts elements of JavaRDD to key/value pairs * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @param opConf the parameters for this operation */ - public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) { - rddf.saveToGemfire(regionPath, func, connConf); + public <K, V> void saveToGemfire( + String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf, Properties opConf) { + rddf.saveToGemfire(regionPath, func, connConf, propertiesToScalaMap(opConf)); + } + + /** + * Save the non-pair RDD to GemFire key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param func the PairFunction that converts elements of JavaRDD to key/value pairs + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + */ + public <K, V> void saveToGemfire( + String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) { + rddf.saveToGemfire(regionPath, func, connConf, emptyStrStrMap()); + } + + /** + * Save the non-pair RDD to GemFire key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param func the PairFunction that converts elements of JavaRDD to key/value pairs + * @param opConf the parameters for this operation + */ + public <K, V> void saveToGemfire( + String regionPath, PairFunction<T, K, V> func, Properties opConf) { + rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf)); } /** @@ -44,7 +70,7 @@ public class GemFireJavaRDDFunctions<T> { * @param func the PairFunction that converts elements of JavaRDD to key/value pairs */ public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> func) { - rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf()); + rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf(), emptyStrStrMap()); } /** @@ -58,8 +84,8 @@ public class GemFireJavaRDDFunctions<T> { * * @param regionPath the region path of the GemFire region * @param func the function that generates region key from RDD element T - * @tparam K the key type of the GemFire region - * @tparam V the value type of the GemFire region + * @param <K> the key type of the GemFire region + * @param <V> the value type of the GemFire region * @return JavaPairRDD<T, V> */ public <K, V> JavaPairRDD<T, V> joinGemfireRegion(String regionPath, Function<T, K> func) { @@ -78,8 +104,8 @@ public class GemFireJavaRDDFunctions<T> { * @param regionPath the region path of the GemFire region * @param func the function that generates region key from RDD element T * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @tparam K the key type of the GemFire region - * @tparam V the value type of the GemFire region + * @param <K> the key type of the GemFire region + * @param <V> the value type of the GemFire region * @return JavaPairRDD<T, V> */ public <K, V> JavaPairRDD<T, V> joinGemfireRegion( @@ -101,8 +127,8 @@ public class GemFireJavaRDDFunctions<T> { * * @param regionPath the region path of the GemFire region * @param func the function that generates region key from RDD element T - * @tparam K the key type of the GemFire region - * @tparam V the value type of the GemFire region + * @param <K> the key type of the GemFire region + * @param <V> the value type of the GemFire region * @return JavaPairRDD<T, Option<V>> */ public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion(String regionPath, Function<T, K> func) { @@ -121,8 +147,8 @@ public class GemFireJavaRDDFunctions<T> { * @param regionPath the region path of the GemFire region * @param func the function that generates region key from RDD element T * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @tparam K the key type of the GemFire region - * @tparam V the value type of the GemFire region + * @param <K> the key type of the GemFire region + * @param <V> the value type of the GemFire region * @return JavaPairRDD<T, Option<V>> */ public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion( http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java index ff11588..5e0c928 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java @@ -27,7 +27,8 @@ public final class GemFireJavaUtil { public static String NumberPartitionsPerServerPropKey = package$.MODULE$.NumberPartitionsPerServerPropKey(); public static String OnePartitionPartitionerName = package$.MODULE$.OnePartitionPartitionerName(); public static String ServerSplitsPartitionerName = package$.MODULE$.ServerSplitsPartitionerName(); - + public static String RDDSaveBatchSizePropKey = package$.MODULE$.RDDSaveBatchSizePropKey(); + public static int RDDSaveBatchSizeDefault = package$.MODULE$.RDDSaveBatchSizeDefault(); /** The private constructor is used prevents user from creating instance of this class. */ private GemFireJavaUtil() { } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala index 9fb1c04..86ec596 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala @@ -16,11 +16,15 @@ class GemFirePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable w * Save the RDD of pairs to GemFire key-value store without any conversion * @param regionPath the full path of region that the RDD is stored * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @param opConf the optional parameters for this operation */ - def saveToGemfire(regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): Unit = { + def saveToGemfire( + regionPath: String, + connConf: GemFireConnectionConf = defaultConnectionConf, + opConf: Map[String, String] = Map.empty): Unit = { connConf.getConnection.validateRegion[K, V](regionPath) logInfo(s"Save RDD id=${rdd.id} to region $regionPath") - val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf) + val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf, opConf) rdd.sparkContext.runJob(rdd, writer.write _) } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala index 4ffacc5..3aa1ebd 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala @@ -17,18 +17,26 @@ class GemFireRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging * @param regionPath the full path of region that the RDD is stored * @param func the function that converts elements of RDD to key/value pairs * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @param opConf the optional parameters for this operation */ - def saveToGemfire[K, V](regionPath: String, func: T => (K, V), connConf: GemFireConnectionConf = defaultConnectionConf): Unit = { + def saveToGemfire[K, V]( + regionPath: String, + func: T => (K, V), + connConf: GemFireConnectionConf = defaultConnectionConf, + opConf: Map[String, String] = Map.empty): Unit = { connConf.getConnection.validateRegion[K, V](regionPath) logInfo(s"Save RDD id=${rdd.id} to region $regionPath") - val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf) + val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf, opConf) rdd.sparkContext.runJob(rdd, writer.write(func) _) } /** This version of saveToGemfire(...) is just for Java API. */ private[connector] def saveToGemfire[K, V]( - regionPath: String, func: PairFunction[T, K, V], connConf: GemFireConnectionConf): Unit = { - saveToGemfire[K, V](regionPath, func.call _, connConf) + regionPath: String, + func: PairFunction[T, K, V], + connConf: GemFireConnectionConf, + opConf: Map[String, String]): Unit = { + saveToGemfire[K, V](regionPath, func.call _, connConf, opConf) } /** http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala index 0c9c34f..bc1a791 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala @@ -30,7 +30,7 @@ object ServerSplitsPartitioner extends GemFireRDDPartitioner { override def partitions[K: ClassTag, V: ClassTag] (conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] = { if (md == null) throw new RuntimeException("RegionMetadata is null") - val n = env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt + val n = try { env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt } catch { case e: NumberFormatException => 2 } if (!md.isPartitioned || md.getServerBucketMap == null || md.getServerBucketMap.isEmpty) Array[Partition](new GemFireRDDPartition(0, Set.empty)) else { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala index 573902b..11e1e07 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala @@ -1,15 +1,18 @@ package io.pivotal.gemfire.spark.connector.internal.rdd import com.gemstone.gemfire.cache.Region -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf +import io.pivotal.gemfire.spark.connector._ import org.apache.spark.{Logging, TaskContext} import scala.collection.Iterator -import collection.JavaConversions._ +import java.util.{HashMap => JMap} /** This trait provide some common code for pair and non-pair RDD writer */ -private[rdd] trait GemFireRDDWriterTraceUtils { - +private[rdd] abstract class GemFireRDDWriterBase (opConf: Map[String, String]) extends Serializable { + + val batchSize = try { opConf.getOrElse(RDDSaveBatchSizePropKey, RDDSaveBatchSizeDefault.toString).toInt} + catch { case e: NumberFormatException => RDDSaveBatchSizeDefault } + def mapDump(map: Map[_, _], num: Int): String = { val firstNum = map.take(num + 1) if (firstNum.size > num) s"$firstNum ..." else s"$firstNum" @@ -21,16 +24,20 @@ private[rdd] trait GemFireRDDWriterTraceUtils { * Those functions will be executed on Spark executors. * @param regionPath the full path of the region where the data is written to */ -class GemFireRDDWriter[T, K, V] -(regionPath: String, connConf: GemFireConnectionConf) extends Serializable with GemFireRDDWriterTraceUtils with Logging { +class GemFireRDDWriter[T, K, V] + (regionPath: String, connConf: GemFireConnectionConf, opConf: Map[String, String] = Map.empty) + extends GemFireRDDWriterBase(opConf) with Serializable with Logging { def write(func: T => (K, V))(taskContext: TaskContext, data: Iterator[T]): Unit = { val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath) - // todo. optimize batch size of putAll - val map: Map[K, V] = data.map(func).toMap - region.putAll(map) - logDebug(s"${map.size} entries are saved to region $regionPath") - logTrace(mapDump(map, 10)) + var count = 0 + val chunks = data.grouped(batchSize) + chunks.foreach { chunk => + val map = chunk.foldLeft(new JMap[K, V]()){case (m, t) => val (k, v) = func(t); m.put(k, v); m} + region.putAll(map) + count += chunk.length + } + logDebug(s"$count entries (batch.size = $batchSize) are saved to region $regionPath") } } @@ -41,15 +48,19 @@ class GemFireRDDWriter[T, K, V] * @param regionPath the full path of the region where the data is written to */ class GemFirePairRDDWriter[K, V] -(regionPath: String, connConf: GemFireConnectionConf) extends Serializable with GemFireRDDWriterTraceUtils with Logging { + (regionPath: String, connConf: GemFireConnectionConf, opConf: Map[String, String] = Map.empty) + extends GemFireRDDWriterBase(opConf) with Serializable with Logging { def write(taskContext: TaskContext, data: Iterator[(K, V)]): Unit = { val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath) - // todo. optimize batch size of putAll - val map: Map[K, V] = data.toMap - region.putAll(map) - logDebug(s"${map.size} entries are saved to region $regionPath") - logTrace(mapDump(map, 10)) + var count = 0 + val chunks = data.grouped(batchSize) + chunks.foreach { chunk => + val map = chunk.foldLeft(new JMap[K, V]()){case (m, (k,v)) => m.put(k,v); m} + region.putAll(map) + count += chunk.length + } + logDebug(s"$count entries (batch.batch = $batchSize) are saved to region $regionPath") } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala index cf7b250..11de6f1 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala @@ -32,4 +32,6 @@ private[connector] object JavaAPIHelper { def toJavaPairDStream[K, V](ds: JavaDStream[(K, V)]): JavaPairDStream[K, V] = JavaPairDStream.fromJavaDStream(ds) + /** an empty Map[String, String] for default opConf **/ + val emptyStrStrMap: Map[String, String] = Map.empty } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala index 834d6a5..72a5bb1 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala @@ -22,6 +22,9 @@ package object connector { final val OnePartitionPartitionerName = OnePartitionPartitioner.name final val ServerSplitsPartitionerName = ServerSplitsPartitioner.name + final val RDDSaveBatchSizePropKey = "rdd.save.batch.size" + final val RDDSaveBatchSizeDefault = 10000 + implicit def toSparkContextFunctions(sc: SparkContext): GemFireSparkContextFunctions = new GemFireSparkContextFunctions(sc) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala index 91eb784..f064498 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala @@ -18,18 +18,26 @@ class GemFireDStreamFunctions[T](val dstream: DStream[T]) extends Serializable w * @param regionPath the full path of region that the DStream is stored * @param func the function that converts elements of the DStream to key/value pairs * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @param opConf the optional parameters for this operation */ def saveToGemfire[K, V]( - regionPath: String, func: T => (K, V), connConf: GemFireConnectionConf = defaultConnectionConf): Unit = { + regionPath: String, + func: T => (K, V), + connConf: GemFireConnectionConf = defaultConnectionConf, + opConf: Map[String, String] = Map.empty): Unit = { connConf.getConnection.validateRegion[K, V](regionPath) - val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf) + val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf, opConf) logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""") dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write(func) _)) } /** this version of saveToGemfire is just for Java API */ - def saveToGemfire[K, V](regionPath: String, func: PairFunction[T, K, V], connConf: GemFireConnectionConf): Unit = { - saveToGemfire[K, V](regionPath, func.call _, connConf) + def saveToGemfire[K, V]( + regionPath: String, + func: PairFunction[T, K, V], + connConf: GemFireConnectionConf, + opConf: Map[String, String] ): Unit = { + saveToGemfire[K, V](regionPath, func.call _, connConf, opConf) } private[connector] def defaultConnectionConf: GemFireConnectionConf = @@ -48,10 +56,14 @@ class GemFirePairDStreamFunctions[K, V](val dstream: DStream[(K,V)]) extends Ser * Save the DStream of pairs to GemFire key-value store without any conversion * @param regionPath the full path of region that the DStream is stored * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @param opConf the optional parameters for this operation */ - def saveToGemfire(regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): Unit = { + def saveToGemfire( + regionPath: String, + connConf: GemFireConnectionConf = defaultConnectionConf, + opConf: Map[String, String] = Map.empty): Unit = { connConf.getConnection.validateRegion[K, V](regionPath) - val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf) + val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf, opConf) logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""") dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _)) } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70448c5d/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala index 659fca2..fdf5ff1 100644 --- a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala @@ -61,30 +61,54 @@ class GemFireRDDFunctionsTest extends FunSuite with Matchers with MockitoSugar { } test("test PairRDDFunctions.saveToGemfire") { + verifyPairRDDFunction(useOpConf = false) + } + + test("test PairRDDFunctions.saveToGemfire w/ opConf") { + verifyPairRDDFunction(useOpConf = true) + } + + def verifyPairRDDFunction(useOpConf: Boolean): Unit = { import io.pivotal.gemfire.spark.connector._ val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test") val mockRDD = mock[RDD[(String, String)]] val mockSparkContext = mock[SparkContext] when(mockRDD.sparkContext).thenReturn(mockSparkContext) - val result = mockRDD.saveToGemfire(regionPath, mockConnConf) + val result = + if (useOpConf) + mockRDD.saveToGemfire(regionPath, mockConnConf, Map(RDDSaveBatchSizePropKey -> "5000")) + else + mockRDD.saveToGemfire(regionPath, mockConnConf) verify(mockConnection, times(1)).validateRegion[String, String](regionPath) result === Unit verify(mockSparkContext, times(1)).runJob[(String, String), Unit]( mockEq(mockRDD), mockAny[(TaskContext, Iterator[(String, String)]) => Unit])(mockAny(classOf[ClassTag[Unit]])) // Note: current implementation make following code not compilable - // so not negative test for this case + // so not negative test for this case // val rdd: RDD[(K, V)] = ... // rdd.saveToGemfire(regionPath, s => (s.length, s)) } test("test RDDFunctions.saveToGemfire") { + verifyRDDFunction(useOpConf = false) + } + + test("test RDDFunctions.saveToGemfire w/ opConf") { + verifyRDDFunction(useOpConf = true) + } + + def verifyRDDFunction(useOpConf: Boolean): Unit = { import io.pivotal.gemfire.spark.connector._ val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test") val mockRDD = mock[RDD[(String)]] val mockSparkContext = mock[SparkContext] when(mockRDD.sparkContext).thenReturn(mockSparkContext) - val result = mockRDD.saveToGemfire(regionPath, s => (s.length, s), mockConnConf) + val result = + if (useOpConf) + mockRDD.saveToGemfire(regionPath, s => (s.length, s), mockConnConf, Map(RDDSaveBatchSizePropKey -> "5000")) + else + mockRDD.saveToGemfire(regionPath, s => (s.length, s), mockConnConf) verify(mockConnection, times(1)).validateRegion[Int, String](regionPath) result === Unit verify(mockSparkContext, times(1)).runJob[String, Unit](