Repository: ignite
Updated Branches:
  refs/heads/master fc1241387 -> b583fb3ee


ignite-3009 Changed test to use streamer with allowOverwrite=true to avoid 
known issue with allowOverwrite=false on unstable topology.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b583fb3e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b583fb3e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b583fb3e

Branch: refs/heads/master
Commit: b583fb3ee1ac54e9f405adbeff4e582e6fd8537d
Parents: fc12413
Author: sboikov <[email protected]>
Authored: Fri May 20 09:17:15 2016 +0300
Committer: sboikov <[email protected]>
Committed: Fri May 20 09:17:15 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/spark/JavaIgniteRDD.scala  |  6 ++++--
 .../spark/JavaEmbeddedIgniteRDDSelfTest.java     | 19 ++++++++++---------
 2 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b583fb3e/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
index 2e8702e..40bceab 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -80,12 +80,14 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
 
     def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd))
 
-    def savePairs(jrdd: JavaPairRDD[K, V]) = {
+    def savePairs(jrdd: JavaPairRDD[K, V], overwrite: Boolean) = {
         val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd)
 
-        rdd.savePairs(rrdd)
+        rdd.savePairs(rrdd, overwrite)
     }
 
+    def savePairs(jrdd: JavaPairRDD[K, V]) : Unit = savePairs(jrdd, overwrite 
= false)
+
     def clear(): Unit = rdd.clear()
 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b583fb3e/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
 
b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
index 5ceaca7..0c4d556 100644
--- 
a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
+++ 
b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
@@ -106,6 +106,8 @@ public class JavaEmbeddedIgniteRDDSelfTest extends 
GridCommonAbstractTest {
 
     /**
      * Creates default spark context
+     *
+     * @return Context.
      */
     private JavaSparkContext createContext() {
         SparkConf conf = new SparkConf();
@@ -127,7 +129,7 @@ public class JavaEmbeddedIgniteRDDSelfTest extends 
GridCommonAbstractTest {
             ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), 
false);
 
             ic.fromCache(PARTITIONED_CACHE_NAME)
-                .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 
GRID_CNT).mapToPair(TO_PAIR_F));
+                .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 
GRID_CNT).mapToPair(TO_PAIR_F), true);
 
             Ignite ignite = ic.ignite();
 
@@ -186,8 +188,6 @@ public class JavaEmbeddedIgniteRDDSelfTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testQueryObjectsFromIgnite() throws Exception {
-        fail("IGNITE-3009");
-
         JavaSparkContext sc = createContext();
 
         JavaIgniteContext<String, Entity> ic = null;
@@ -198,10 +198,7 @@ public class JavaEmbeddedIgniteRDDSelfTest extends 
GridCommonAbstractTest {
             JavaIgniteRDD<String, Entity> cache = 
ic.fromCache(PARTITIONED_CACHE_NAME);
 
             int cnt = 1001;
-
-            List<Integer> numbers = F.range(0, cnt);
-
-            cache.savePairs(sc.parallelize(numbers, 
GRID_CNT).mapToPair(INT_TO_ENTITY_F));
+            cache.savePairs(sc.parallelize(F.range(0, cnt), 
GRID_CNT).mapToPair(INT_TO_ENTITY_F), true);
 
             List<Entity> res = cache.objectSql("Entity", "name = ? and salary 
= ?", "name50", 5000)
                 .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
@@ -238,7 +235,7 @@ public class JavaEmbeddedIgniteRDDSelfTest extends 
GridCommonAbstractTest {
 
             JavaIgniteRDD<String, Entity> cache = 
ic.fromCache(PARTITIONED_CACHE_NAME);
 
-            cache.savePairs(sc.parallelize(F.range(0, 1001), 
GRID_CNT).mapToPair(INT_TO_ENTITY_F));
+            cache.savePairs(sc.parallelize(F.range(0, 1001), 
GRID_CNT).mapToPair(INT_TO_ENTITY_F), true);
 
             DataFrame df =
                 cache.sql("select id, name, salary from Entity where name = ? 
and salary = ?", "name50", 5000);
@@ -281,6 +278,8 @@ public class JavaEmbeddedIgniteRDDSelfTest extends 
GridCommonAbstractTest {
     /**
      * @param gridName Grid name.
      * @param client Client.
+     * @throws Exception If failed.
+     * @return Confiuration.
      */
     private static IgniteConfiguration getConfiguration(String gridName, 
boolean client) throws Exception {
         IgniteConfiguration cfg = new IgniteConfiguration();
@@ -302,6 +301,8 @@ public class JavaEmbeddedIgniteRDDSelfTest extends 
GridCommonAbstractTest {
 
     /**
      * Creates cache configuration.
+     *
+     * @return Cache configuration.
      */
     private static CacheConfiguration<Object, Object> cacheConfiguration() {
         CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
@@ -340,4 +341,4 @@ public class JavaEmbeddedIgniteRDDSelfTest extends 
GridCommonAbstractTest {
             return t._2();
         }
     }
-}
\ No newline at end of file
+}

Reply via email to