Repository: ignite Updated Branches: refs/heads/master fc1241387 -> b583fb3ee
ignite-3009 Changed test to use streamer with allowOverwrite=true to avoid known issue with allowOverwrite=false on unstable topology. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b583fb3e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b583fb3e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b583fb3e Branch: refs/heads/master Commit: b583fb3ee1ac54e9f405adbeff4e582e6fd8537d Parents: fc12413 Author: sboikov <[email protected]> Authored: Fri May 20 09:17:15 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri May 20 09:17:15 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/spark/JavaIgniteRDD.scala | 6 ++++-- .../spark/JavaEmbeddedIgniteRDDSelfTest.java | 19 ++++++++++--------- 2 files changed, 14 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b583fb3e/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala index 2e8702e..40bceab 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala @@ -80,12 +80,14 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V]) def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd)) - def savePairs(jrdd: JavaPairRDD[K, V]) = { + def savePairs(jrdd: JavaPairRDD[K, V], overwrite: Boolean) = { val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd) - rdd.savePairs(rrdd) + rdd.savePairs(rrdd, overwrite) } + def savePairs(jrdd: JavaPairRDD[K, V]) : Unit = savePairs(jrdd, overwrite = false) + def clear(): Unit = rdd.clear() } http://git-wip-us.apache.org/repos/asf/ignite/blob/b583fb3e/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java index 5ceaca7..0c4d556 100644 --- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java +++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java @@ -106,6 +106,8 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { /** * Creates default spark context + * + * @return Context. */ private JavaSparkContext createContext() { SparkConf conf = new SparkConf(); @@ -127,7 +129,7 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false); ic.fromCache(PARTITIONED_CACHE_NAME) - .savePairs(sc.parallelize(F.range(0, KEYS_CNT), GRID_CNT).mapToPair(TO_PAIR_F)); + .savePairs(sc.parallelize(F.range(0, KEYS_CNT), GRID_CNT).mapToPair(TO_PAIR_F), true); Ignite ignite = ic.ignite(); @@ -186,8 +188,6 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testQueryObjectsFromIgnite() throws Exception { - fail("IGNITE-3009"); - JavaSparkContext sc = createContext(); JavaIgniteContext<String, Entity> ic = null; @@ -198,10 +198,7 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); int cnt = 1001; - - List<Integer> numbers = F.range(0, cnt); - - cache.savePairs(sc.parallelize(numbers, GRID_CNT).mapToPair(INT_TO_ENTITY_F)); + cache.savePairs(sc.parallelize(F.range(0, cnt), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true); List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000) .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect(); @@ -238,7 +235,7 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); - cache.savePairs(sc.parallelize(F.range(0, 1001), GRID_CNT).mapToPair(INT_TO_ENTITY_F)); + cache.savePairs(sc.parallelize(F.range(0, 1001), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true); DataFrame df = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000); @@ -281,6 +278,8 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { /** * @param gridName Grid name. * @param client Client. + * @throws Exception If failed. + * @return Confiuration. */ private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception { IgniteConfiguration cfg = new IgniteConfiguration(); @@ -302,6 +301,8 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { /** * Creates cache configuration. + * + * @return Cache configuration. */ private static CacheConfiguration<Object, Object> cacheConfiguration() { CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); @@ -340,4 +341,4 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { return t._2(); } } -} \ No newline at end of file +}
