IGNITE-2941: Add getOrStart method to ignition IGNITE-2942: Use getOrStart in IgniteContext instead of current try-catch structure This closes #631 Reviewed by Denis Magda, Alexey Goncharuk.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a9d375e5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a9d375e5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a9d375e5 Branch: refs/heads/ignite-2926 Commit: a9d375e595e504ce22c684b91d1a0da228c4a606 Parents: 3779fe4 Author: Alexei Scherbakov <[email protected]> Authored: Tue Apr 19 07:11:54 2016 +0300 Committer: Denis Magda <[email protected]> Committed: Tue Apr 19 07:11:54 2016 +0300 ---------------------------------------------------------------------- .../main/java/org/apache/ignite/Ignition.java | 19 +- .../org/apache/ignite/internal/IgnitionEx.java | 60 +++- .../ignite/internal/GridGetOrStartSelfTest.java | 129 +++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 16 +- .../org/apache/ignite/spark/IgniteContext.scala | 73 ++-- .../apache/ignite/spark/JavaIgniteContext.scala | 14 +- .../spark/JavaEmbeddedIgniteRDDSelfTest.java | 343 +++++++++++++++++++ .../ignite/spark/JavaIgniteRDDSelfTest.java | 302 ---------------- .../spark/JavaStandaloneIgniteRDDSelfTest.java | 302 ++++++++++++++++ .../ignite/testsuites/IgniteRDDTestSuite.java | 40 +++ 10 files changed, 935 insertions(+), 363 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/core/src/main/java/org/apache/ignite/Ignition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignition.java b/modules/core/src/main/java/org/apache/ignite/Ignition.java index 99ee1d9..b4c01f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignition.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignition.java @@ -309,7 +309,7 @@ public class Ignition { } /** - * Starts grid with given configuration. Note that this method is no-op if grid with the name + * Starts grid with given configuration. Note that this method will throw an exception if grid with the name * provided in given configuration is already started. * * @param cfg Grid configuration. This cannot be {@code null}. @@ -401,6 +401,23 @@ public class Ignition { } } + + /** + * Gets or starts new grid instance if it hasn't been started yet. + * + * @param cfg Grid configuration. This cannot be {@code null}. + * @return Grid instance. + * @throws IgniteException If grid could not be started. + */ + public static Ignite getOrStart(IgniteConfiguration cfg) throws IgniteException { + try { + return IgnitionEx.start(cfg, false); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + /** * Loads Spring bean by its name from given Spring XML configuration file. If bean * with such name doesn't exist, exception is thrown. http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 533b6d8..9a83826 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -499,7 +499,7 @@ public class IgnitionEx { U.warn(null, "Default Spring XML file not found (is IGNITE_HOME set?): " + DFLT_CFG); - return start0(new GridStartContext(new IgniteConfiguration(), null, springCtx)).grid(); + return start0(new GridStartContext(new IgniteConfiguration(), null, springCtx), true).grid(); } /** @@ -512,11 +512,25 @@ public class IgnitionEx { * also if named grid has already been started. */ public static Ignite start(IgniteConfiguration cfg) throws IgniteCheckedException { - return start(cfg, null); + return start(cfg, null, true); } /** - * Starts grid with given configuration. Note that this method is no-op if grid with the name + * Starts a grid with given configuration. If the grid is already started and failIfStarted set to TRUE + * an exception will be thrown. + * + * @param cfg Grid configuration. This cannot be {@code null}. + * failIfStarted Throw or not an exception if grid is already started. + * @return Started grid. + * @throws IgniteCheckedException If grid could not be started. This exception will be thrown + * also if named grid has already been started. + */ + public static Ignite start(IgniteConfiguration cfg, boolean failIfStarted) throws IgniteCheckedException { + return start(cfg, null, failIfStarted); + } + + /** + * Starts grid with given configuration. Note that this method will throw and exception if grid with the name * provided in given configuration is already started. * * @param cfg Grid configuration. This cannot be {@code null}. @@ -531,7 +545,27 @@ public class IgnitionEx { public static Ignite start(IgniteConfiguration cfg, @Nullable GridSpringResourceContext springCtx) throws IgniteCheckedException { A.notNull(cfg, "cfg"); - return start0(new GridStartContext(cfg, null, springCtx)).grid(); + return start0(new GridStartContext(cfg, null, springCtx), true).grid(); + } + + /** + * Starts grid with given configuration. If the grid is already started and failIfStarted set to TRUE + * an exception will be thrown. + * + * @param cfg Grid configuration. This cannot be {@code null}. + * @param springCtx Optional Spring application context, possibly {@code null}. + * Spring bean definitions for bean injection are taken from this context. + * If provided, this context can be injected into grid tasks and grid jobs using + * {@link SpringApplicationContextResource @SpringApplicationContextResource} annotation. + * @param failIfStarted Throw or not an exception if grid is already started. + * @return Started grid. + * @throws IgniteCheckedException If grid could not be started. This exception will be thrown + * also if named grid has already been started. + */ + public static Ignite start(IgniteConfiguration cfg, @Nullable GridSpringResourceContext springCtx, boolean failIfStarted) throws IgniteCheckedException { + A.notNull(cfg, "cfg"); + + return start0(new GridStartContext(cfg, null, springCtx), failIfStarted).grid(); } /** @@ -927,7 +961,7 @@ public class IgnitionEx { // Use either user defined context or our one. IgniteNamedInstance grid = start0( - new GridStartContext(cfg, springCfgUrl, springCtx == null ? cfgMap.get2() : springCtx)); + new GridStartContext(cfg, springCfgUrl, springCtx == null ? cfgMap.get2() : springCtx), true); // Add it if it was not stopped during startup. if (grid != null) @@ -958,10 +992,11 @@ public class IgnitionEx { * Starts grid with given configuration. * * @param startCtx Start context. + * @param failIfStarted Throw or not an exception if grid is already started. * @return Started grid. * @throws IgniteCheckedException If grid could not be started. */ - private static IgniteNamedInstance start0(GridStartContext startCtx) throws IgniteCheckedException { + private static IgniteNamedInstance start0(GridStartContext startCtx, boolean failIfStarted ) throws IgniteCheckedException { assert startCtx != null; String name = startCtx.config().getGridName(); @@ -984,12 +1019,15 @@ public class IgnitionEx { } } - if (old != null) { - if (name == null) - throw new IgniteCheckedException("Default Ignite instance has already been started."); + if (old != null) + if (failIfStarted) { + if (name == null) + throw new IgniteCheckedException("Default Ignite instance has already been started."); + else + throw new IgniteCheckedException("Ignite instance with this name has already been started: " + name); + } else - throw new IgniteCheckedException("Ignite instance with this name has already been started: " + name); - } + return old; if (startCtx.config().getWarmupClosure() != null) startCtx.config().getWarmupClosure().apply(startCtx.config()); http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/core/src/test/java/org/apache/ignite/internal/GridGetOrStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridGetOrStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridGetOrStartSelfTest.java new file mode 100644 index 0000000..9b3985e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridGetOrStartSelfTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.*; + +/** + * The GridGetOrStartSelfTest tests get or start semantics. + */ + +@GridCommonTest(group = "Kernal Self") +public class GridGetOrStartSelfTest extends GridCommonAbstractTest { + /** Concurrency. */ + public static final int CONCURRENCY = 10; + + /** + * Default constructor. + */ + public GridGetOrStartSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Tests default grid + */ + public void testDefaultGridGetOrStart() throws Exception { + IgniteConfiguration cfg = getConfiguration(null); + + try (Ignite ignite = Ignition.getOrStart(cfg)) { + try { + Ignition.start(cfg); + + fail("Expected exception after grid started"); + } + catch (IgniteException ignored) { + } + + Ignite ignite2 = Ignition.getOrStart(cfg); + + assertEquals("Must return same instance", ignite, ignite2); + } + + assertTrue(G.allGrids().isEmpty()); + } + + /** + * Tests named grid + */ + public void testNamedGridGetOrStart() throws Exception { + IgniteConfiguration cfg = getConfiguration("test"); + try (Ignite ignite = Ignition.getOrStart(cfg)) { + try { + Ignition.start(cfg); + + fail("Expected exception after grid started"); + } + catch (IgniteException ignored) { + // No-op. + } + + Ignite ignite2 = Ignition.getOrStart(cfg); + + assertEquals("Must return same instance", ignite, ignite2); + } + + assertTrue(G.allGrids().isEmpty()); + } + + /** + * Tests concurrent grid initialization + */ + public void testConcurrentGridGetOrStartCon() throws Exception { + final IgniteConfiguration cfg = getConfiguration(null); + + final AtomicReference<Ignite> ref = new AtomicReference<>(); + + try { + GridTestUtils.runMultiThreaded(new Runnable() { + @Override public void run() { + // must return same instance in each thread + + try { + Ignite ignite = Ignition.getOrStart(cfg); + + boolean set = ref.compareAndSet(null, ignite); + + if (!set) + assertEquals(ref.get(), ignite); + } + catch (IgniteException e) { + throw new RuntimeException("Ignite error", e); + } + } + }, CONCURRENCY, "GridCreatorThread"); + } + catch (Exception ignored) { + fail("Exception is not expected"); + } + + G.stopAll(true); + + assertTrue(G.allGrids().isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 9e2324c..bb4b0f0 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -20,19 +20,7 @@ package org.apache.ignite.testsuites; import java.util.Set; import junit.framework.TestSuite; import org.apache.ignite.GridSuppressedExceptionSelfTest; -import org.apache.ignite.internal.ClusterGroupHostsSelfTest; -import org.apache.ignite.internal.ClusterGroupSelfTest; -import org.apache.ignite.internal.GridFailFastNodeFailureDetectionSelfTest; -import org.apache.ignite.internal.GridLifecycleAwareSelfTest; -import org.apache.ignite.internal.GridLifecycleBeanSelfTest; -import org.apache.ignite.internal.GridNodeMetricsLogSelfTest; -import org.apache.ignite.internal.GridProjectionForCachesSelfTest; -import org.apache.ignite.internal.GridReduceSelfTest; -import org.apache.ignite.internal.GridReleaseTypeSelfTest; -import org.apache.ignite.internal.GridSelfTest; -import org.apache.ignite.internal.GridStartStopSelfTest; -import org.apache.ignite.internal.GridStopWithCancelSelfTest; -import org.apache.ignite.internal.IgniteSlowClientDetectionSelfTest; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRendezvousSelfTest; import org.apache.ignite.internal.processors.cache.GridProjectionForCachesOnDaemonNodeSelfTest; import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCacheTest; @@ -129,6 +117,8 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(VariationsIteratorTest.class); suite.addTestSuite(ConfigVariationsTestSuiteBuilderTest.class); + GridTestUtils.addTestIfNeeded(suite, GridGetOrStartSelfTest.class, ignoredTests); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index 57fe84f..182605c 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -17,13 +17,12 @@ package org.apache.ignite.spark - -import org.apache.ignite.internal.IgnitionEx -import org.apache.ignite.internal.util.IgniteUtils import org.apache.ignite._ import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} -import org.apache.spark.{Logging, SparkContext} +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.internal.util.IgniteUtils import org.apache.spark.sql.SQLContext +import org.apache.spark.{Logging, SparkContext} /** * Ignite context. @@ -36,15 +35,13 @@ import org.apache.spark.sql.SQLContext class IgniteContext[K, V]( @transient val sparkContext: SparkContext, cfgF: () â IgniteConfiguration, - client: Boolean = true -) extends Serializable with Logging { - @transient private val driver = true - + standalone: Boolean = true + ) extends Serializable with Logging { private val cfgClo = new Once(cfgF) private val igniteHome = IgniteUtils.getIgniteHome - if (!client) { + if (!standalone) { // Get required number of executors with default equals to number of available executors. val workers = sparkContext.getConf.getInt("spark.executor.instances", sparkContext.getExecutorStorageStatus.length) @@ -55,7 +52,7 @@ class IgniteContext[K, V]( logInfo("Will start Ignite nodes on " + workers + " workers") // Start ignite server node on each worker in server mode. - sparkContext.parallelize(1 to workers, workers).foreach(it â ignite()) + sparkContext.parallelize(1 to workers, workers).foreachPartition(it â ignite()) } // Make sure to start Ignite on context creation. @@ -71,7 +68,7 @@ class IgniteContext[K, V]( sc: SparkContext, springUrl: String, client: Boolean - ) { + ) { this(sc, () â IgnitionEx.loadConfiguration(springUrl).get1(), client) } @@ -84,7 +81,7 @@ class IgniteContext[K, V]( def this( sc: SparkContext, springUrl: String - ) { + ) { this(sc, () â IgnitionEx.loadConfiguration(springUrl).get1()) } @@ -124,10 +121,8 @@ class IgniteContext[K, V]( } /** - * Gets an Ignite instance supporting this context. Ignite instance will be started - * if it has not been started yet. - * - * @return Ignite instance. + * Get or start Ignite instance it it's not started yet. + * @return */ def ignite(): Ignite = { val home = IgniteUtils.getIgniteHome @@ -142,24 +137,17 @@ class IgniteContext[K, V]( val igniteCfg = cfgClo() + // check if called from driver + if (sparkContext != null) igniteCfg.setClientMode(true) + try { - Ignition.ignite(igniteCfg.getGridName) + Ignition.getOrStart(igniteCfg) } catch { - case e: IgniteIllegalStateException â - try { - igniteCfg.setClientMode(client || driver) - - Ignition.start(igniteCfg) - } - catch { - case e: IgniteException â { - logError("Failed to start Ignite client. Will try to use an existing instance with name: " - + igniteCfg.getGridName, e) - - Ignition.ignite(igniteCfg.getGridName) - } - } + case e: IgniteException â + logError("Failed to start Ignite.", e) + + throw e } } @@ -167,7 +155,25 @@ class IgniteContext[K, V]( * Stops supporting ignite instance. If ignite instance has been already stopped, this operation will be * a no-op. */ - def close() = { + def close(shutdownIgniteOnWorkers: Boolean = false) = { + // additional check if called from driver + if (sparkContext != null && shutdownIgniteOnWorkers) { + // Get required number of executors with default equals to number of available executors. + val workers = sparkContext.getConf.getInt("spark.executor.instances", + sparkContext.getExecutorStorageStatus.length) + + if (workers > 0) { + logInfo("Will stop Ignite nodes on " + workers + " workers") + + // Start ignite server node on each worker in server mode. + sparkContext.parallelize(1 to workers, workers).foreachPartition(it â doClose()) + } + } + + doClose() + } + + private def doClose() = { val igniteCfg = cfgClo() Ignition.stop(igniteCfg.getGridName, false) @@ -184,8 +190,11 @@ private class Once(clo: () â IgniteConfiguration) extends Serializable { def apply(): IgniteConfiguration = { if (res == null) { + this.synchronized { + if (res == null) + res = clo() } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala index e2d57bf..44b1cd9 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala @@ -34,10 +34,16 @@ import scala.reflect.ClassTag * @tparam V Value type. */ class JavaIgniteContext[K, V]( - @scala.transient val sc: JavaSparkContext, - val cfgF: IgniteOutClosure[IgniteConfiguration]) extends Serializable { + @transient val sc: JavaSparkContext, + val cfgF: IgniteOutClosure[IgniteConfiguration], + standalone: Boolean = true + ) extends Serializable { - @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply()) + @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply(), standalone) + + def this(sc: JavaSparkContext, cfgF: IgniteOutClosure[IgniteConfiguration]) { + this(sc, cfgF, true) + } def this(sc: JavaSparkContext, springUrl: String) { this(sc, new IgniteOutClosure[IgniteConfiguration] { @@ -53,7 +59,7 @@ class JavaIgniteContext[K, V]( def ignite(): Ignite = ic.ignite() - def close() = ic.close() + def close(shutdownIgniteOnWorkers:Boolean = false) = ic.close(shutdownIgniteOnWorkers) private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java new file mode 100644 index 0000000..5ceaca7 --- /dev/null +++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import scala.Tuple2; + +/** + * Tests for {@link JavaIgniteRDD} (embedded mode). + */ +public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { + /** For grid names generation */ + private static AtomicInteger cntr = new AtomicInteger(1); + + /** Grid names. */ + private static ThreadLocal<Integer> gridNames = new ThreadLocal<Integer>() { + @Override protected Integer initialValue() { + return cntr.getAndIncrement(); + } + }; + + /** Grid count. */ + private static final int GRID_CNT = 3; + + /** Keys count. */ + private static final int KEYS_CNT = 10000; + + /** Cache name. */ + private static final String PARTITIONED_CACHE_NAME = "partitioned"; + + /** Sum function. */ + private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() { + public Integer call(Integer x, Integer y) { + return x + y; + } + }; + + /** To pair function. */ + private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() { + /** {@inheritDoc} */ + @Override public Tuple2<String, String> call(Integer i) { + return new Tuple2<>(String.valueOf(i), "val" + i); + } + }; + + /** (String, Integer); pair to Integer value function. */ + private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>(); + + /** (String, Entity) pair to Entity value function. */ + private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F = + new PairToValueFunction<>(); + + /** Integer to entity function. */ + private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F = + new PairFunction<Integer, String, Entity>() { + @Override public Tuple2<String, Entity> call(Integer i) throws Exception { + return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100)); + } + }; + + /** + * Default constructor. + */ + public JavaEmbeddedIgniteRDDSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Creates default spark context + */ + private JavaSparkContext createContext() { + SparkConf conf = new SparkConf(); + + conf.set("spark.executor.instances", String.valueOf(GRID_CNT)); + + return new JavaSparkContext("local[" + GRID_CNT + "]", "test", conf); + } + + /** + * @throws Exception If failed. + */ + public void testStoreDataToIgnite() throws Exception { + JavaSparkContext sc = createContext(); + + JavaIgniteContext<String, String> ic = null; + + try { + ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false); + + ic.fromCache(PARTITIONED_CACHE_NAME) + .savePairs(sc.parallelize(F.range(0, KEYS_CNT), GRID_CNT).mapToPair(TO_PAIR_F)); + + Ignite ignite = ic.ignite(); + + IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME); + + for (int i = 0; i < KEYS_CNT; i++) { + String val = cache.get(String.valueOf(i)); + + assertNotNull("Value was not put to cache for key: " + i, val); + assertEquals("Invalid value stored for key: " + i, "val" + i, val); + } + } + finally { + if (ic != null) + ic.close(true); + + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReadDataFromIgnite() throws Exception { + JavaSparkContext sc = createContext(); + + JavaIgniteContext<String, Integer> ic = null; + + try { + ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false); + + Ignite ignite = ic.ignite(); + + IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME); + + for (int i = 0; i < KEYS_CNT; i++) + cache.put(String.valueOf(i), i); + + JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F); + + int sum = values.fold(0, SUM_F); + + int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT; + + assertEquals(expSum, sum); + } + finally { + if (ic != null) + ic.close(true); + + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryObjectsFromIgnite() throws Exception { + fail("IGNITE-3009"); + + JavaSparkContext sc = createContext(); + + JavaIgniteContext<String, Entity> ic = null; + + try { + ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false); + + JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); + + int cnt = 1001; + + List<Integer> numbers = F.range(0, cnt); + + cache.savePairs(sc.parallelize(numbers, GRID_CNT).mapToPair(INT_TO_ENTITY_F)); + + List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000) + .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect(); + + assertEquals("Invalid result length", 1, res.size()); + assertEquals("Invalid result", 50, res.get(0).id()); + assertEquals("Invalid result", "name50", res.get(0).name()); + assertEquals("Invalid result", 5000, res.get(0).salary()); + +// Ignite ignite = ic.ignite(); +// IgniteCache<Object, Object> underCache = ignite.cache(PARTITIONED_CACHE_NAME); +// assertEquals("Invalid total count", cnt, underCache.size()); + + assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count()); + } + finally { + if (ic != null) + ic.close(true); + + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryFieldsFromIgnite() throws Exception { + JavaSparkContext sc = createContext(); + + JavaIgniteContext<String, Entity> ic = null; + + try { + ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false); + + JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); + + cache.savePairs(sc.parallelize(F.range(0, 1001), GRID_CNT).mapToPair(INT_TO_ENTITY_F)); + + DataFrame df = + cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000); + + df.printSchema(); + + Row[] res = df.collect(); + + assertEquals("Invalid result length", 1, res.length); + assertEquals("Invalid result", 50, res[0].get(0)); + assertEquals("Invalid result", "name50", res[0].get(1)); + assertEquals("Invalid result", 5000, res[0].get(2)); + + Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000)); + + DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp); + + df.printSchema(); + + Row[] res0 = df0.collect(); + + assertEquals("Invalid result length", 1, res0.length); + assertEquals("Invalid result", 50, res0[0].get(0)); + assertEquals("Invalid result", "name50", res0[0].get(1)); + assertEquals("Invalid result", 5000, res0[0].get(2)); + + assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count()); + } + finally { + if (ic != null) + ic.close(true); + + sc.stop(); + } + } + + /** Finder. */ + private static TcpDiscoveryVmIpFinder FINDER = new TcpDiscoveryVmIpFinder(true); + + /** + * @param gridName Grid name. + * @param client Client. + */ + private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception { + IgniteConfiguration cfg = new IgniteConfiguration(); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(FINDER); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCacheConfiguration(cacheConfiguration()); + + cfg.setClientMode(client); + + cfg.setGridName(gridName); + + return cfg; + } + + /** + * Creates cache configuration. + */ + private static CacheConfiguration<Object, Object> cacheConfiguration() { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setBackups(1); + + ccfg.setName(PARTITIONED_CACHE_NAME); + + ccfg.setIndexedTypes(String.class, Entity.class); + + return ccfg; + } + + /** + * Ignite configiration provider. + */ + static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> { + /** {@inheritDoc} */ + @Override public IgniteConfiguration apply() { + try { + return getConfiguration("worker-" + gridNames.get(), false); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * @param <K> + * @param <V> + */ + static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> { + /** {@inheritDoc} */ + @Override public V call(Tuple2<K, V> t) throws Exception { + return t._2(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java deleted file mode 100644 index becd90a..0000000 --- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark; - -import java.util.List; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.Ignition; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.lang.IgniteOutClosure; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.Column; -import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.Row; -import scala.Tuple2; - -/** - * Tests for {@link JavaIgniteRDD}. - */ -public class JavaIgniteRDDSelfTest extends GridCommonAbstractTest { - /** Grid count. */ - private static final int GRID_CNT = 3; - - /** Keys count. */ - private static final int KEYS_CNT = 10000; - - /** Cache name. */ - private static final String PARTITIONED_CACHE_NAME = "partitioned"; - - /** Ip finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Sum function. */ - private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() { - public Integer call(Integer x, Integer y) { - return x + y; - } - }; - - /** To pair function. */ - private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() { - /** {@inheritDoc} */ - @Override public Tuple2<String, String> call(Integer i) { - return new Tuple2<>(String.valueOf(i), "val" + i); - } - }; - - /** (String, Integer); pair to Integer value function. */ - private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>(); - - /** (String, Entity) pair to Entity value function. */ - private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F = - new PairToValueFunction<>(); - - /** Integer to entity function. */ - private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F = - new PairFunction<Integer, String, Entity>() { - @Override public Tuple2<String, Entity> call(Integer i) throws Exception { - return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100)); - } - }; - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - Ignition.stop("client", false); - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - for (int i = 0; i < GRID_CNT; i++) - Ignition.start(getConfiguration("grid-" + i, false)); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - for (int i = 0; i < GRID_CNT; i++) - Ignition.stop("grid-" + i, false); - } - - /** - * @throws Exception If failed. - */ - public void testStoreDataToIgnite() throws Exception { - JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); - - try { - JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); - - ic.fromCache(PARTITIONED_CACHE_NAME) - .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F)); - - Ignite ignite = Ignition.ignite("grid-0"); - - IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME); - - for (int i = 0; i < KEYS_CNT; i++) { - String val = cache.get(String.valueOf(i)); - - assertNotNull("Value was not put to cache for key: " + i, val); - assertEquals("Invalid value stored for key: " + i, "val" + i, val); - } - } - finally { - sc.stop(); - } - } - - /** - * @throws Exception If failed. - */ - public void testReadDataFromIgnite() throws Exception { - JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); - - try { - JavaIgniteContext<String, Integer> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); - - Ignite ignite = Ignition.ignite("grid-0"); - - IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME); - - for (int i = 0; i < KEYS_CNT; i++) - cache.put(String.valueOf(i), i); - - JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F); - - int sum = values.fold(0, SUM_F); - - int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT; - - assertEquals(expSum, sum); - } - finally { - sc.stop(); - } - } - - /** - * @throws Exception If failed. - */ - public void testQueryObjectsFromIgnite() throws Exception { - JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); - - try { - JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); - - JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); - - cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F)); - - List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000) - .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect(); - - assertEquals("Invalid result length", 1, res.size()); - assertEquals("Invalid result", 50, res.get(0).id()); - assertEquals("Invalid result", "name50", res.get(0).name()); - assertEquals("Invalid result", 5000, res.get(0).salary()); - assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count()); - } - finally { - sc.stop(); - } - } - - /** - * @throws Exception If failed. - */ - public void testQueryFieldsFromIgnite() throws Exception { - JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); - - try { - JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); - - JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); - - cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F)); - - DataFrame df = - cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000); - - df.printSchema(); - - Row[] res = df.collect(); - - assertEquals("Invalid result length", 1, res.length); - assertEquals("Invalid result", 50, res[0].get(0)); - assertEquals("Invalid result", "name50", res[0].get(1)); - assertEquals("Invalid result", 5000, res[0].get(2)); - - Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000)); - - DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp); - - df.printSchema(); - - Row[] res0 = df0.collect(); - - assertEquals("Invalid result length", 1, res0.length); - assertEquals("Invalid result", 50, res0[0].get(0)); - assertEquals("Invalid result", "name50", res0[0].get(1)); - assertEquals("Invalid result", 5000, res0[0].get(2)); - - assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count()); - } - finally { - sc.stop(); - } - - } - - /** - * @param gridName Grid name. - * @param client Client. - */ - private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception { - IgniteConfiguration cfg = new IgniteConfiguration(); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - cfg.setCacheConfiguration(cacheConfiguration()); - - cfg.setClientMode(client); - - cfg.setGridName(gridName); - - return cfg; - } - - /** - * Creates cache configuration. - */ - private static CacheConfiguration<Object, Object> cacheConfiguration() { - CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); - - ccfg.setBackups(1); - - ccfg.setName(PARTITIONED_CACHE_NAME); - - ccfg.setIndexedTypes(String.class, Entity.class); - - return ccfg; - } - - /** - * Ignite configiration provider. - */ - static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> { - /** {@inheritDoc} */ - @Override public IgniteConfiguration apply() { - try { - return getConfiguration("client", true); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - /** - * @param <K> - * @param <V> - */ - static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> { - /** {@inheritDoc} */ - @Override public V call(Tuple2<K, V> t) throws Exception { - return t._2(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java new file mode 100644 index 0000000..faa8fda --- /dev/null +++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark; + +import java.util.List; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import scala.Tuple2; + +/** + * Tests for {@link JavaIgniteRDD} (standalone mode). + */ +public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest { + /** Grid count. */ + private static final int GRID_CNT = 3; + + /** Keys count. */ + private static final int KEYS_CNT = 10000; + + /** Cache name. */ + private static final String PARTITIONED_CACHE_NAME = "partitioned"; + + /** Ip finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Sum function. */ + private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() { + public Integer call(Integer x, Integer y) { + return x + y; + } + }; + + /** To pair function. */ + private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() { + /** {@inheritDoc} */ + @Override public Tuple2<String, String> call(Integer i) { + return new Tuple2<>(String.valueOf(i), "val" + i); + } + }; + + /** (String, Integer); pair to Integer value function. */ + private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>(); + + /** (String, Entity) pair to Entity value function. */ + private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F = + new PairToValueFunction<>(); + + /** Integer to entity function. */ + private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F = + new PairFunction<Integer, String, Entity>() { + @Override public Tuple2<String, Entity> call(Integer i) throws Exception { + return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100)); + } + }; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + Ignition.stop("client", false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 0; i < GRID_CNT; i++) + Ignition.start(getConfiguration("grid-" + i, false)); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + for (int i = 0; i < GRID_CNT; i++) + Ignition.stop("grid-" + i, false); + } + + /** + * @throws Exception If failed. + */ + public void testStoreDataToIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + ic.fromCache(PARTITIONED_CACHE_NAME) + .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F)); + + Ignite ignite = Ignition.ignite("grid-0"); + + IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME); + + for (int i = 0; i < KEYS_CNT; i++) { + String val = cache.get(String.valueOf(i)); + + assertNotNull("Value was not put to cache for key: " + i, val); + assertEquals("Invalid value stored for key: " + i, "val" + i, val); + } + } + finally { + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReadDataFromIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext<String, Integer> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + Ignite ignite = Ignition.ignite("grid-0"); + + IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME); + + for (int i = 0; i < KEYS_CNT; i++) + cache.put(String.valueOf(i), i); + + JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F); + + int sum = values.fold(0, SUM_F); + + int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT; + + assertEquals(expSum, sum); + } + finally { + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryObjectsFromIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); + + cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F)); + + List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000) + .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect(); + + assertEquals("Invalid result length", 1, res.size()); + assertEquals("Invalid result", 50, res.get(0).id()); + assertEquals("Invalid result", "name50", res.get(0).name()); + assertEquals("Invalid result", 5000, res.get(0).salary()); + assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count()); + } + finally { + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryFieldsFromIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); + + cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F)); + + DataFrame df = + cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000); + + df.printSchema(); + + Row[] res = df.collect(); + + assertEquals("Invalid result length", 1, res.length); + assertEquals("Invalid result", 50, res[0].get(0)); + assertEquals("Invalid result", "name50", res[0].get(1)); + assertEquals("Invalid result", 5000, res[0].get(2)); + + Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000)); + + DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp); + + df.printSchema(); + + Row[] res0 = df0.collect(); + + assertEquals("Invalid result length", 1, res0.length); + assertEquals("Invalid result", 50, res0[0].get(0)); + assertEquals("Invalid result", "name50", res0[0].get(1)); + assertEquals("Invalid result", 5000, res0[0].get(2)); + + assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count()); + } + finally { + sc.stop(); + } + + } + + /** + * @param gridName Grid name. + * @param client Client. + */ + private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception { + IgniteConfiguration cfg = new IgniteConfiguration(); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCacheConfiguration(cacheConfiguration()); + + cfg.setClientMode(client); + + cfg.setGridName(gridName); + + return cfg; + } + + /** + * Creates cache configuration. + */ + private static CacheConfiguration<Object, Object> cacheConfiguration() { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setBackups(1); + + ccfg.setName(PARTITIONED_CACHE_NAME); + + ccfg.setIndexedTypes(String.class, Entity.class); + + return ccfg; + } + + /** + * Ignite configiration provider. + */ + static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> { + /** {@inheritDoc} */ + @Override public IgniteConfiguration apply() { + try { + return getConfiguration("client", true); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * @param <K> + * @param <V> + */ + static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> { + /** {@inheritDoc} */ + @Override public V call(Tuple2<K, V> t) throws Exception { + return t._2(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/spark/src/test/java/org/apache/ignite/testsuites/IgniteRDDTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/ignite/testsuites/IgniteRDDTestSuite.java b/modules/spark/src/test/java/org/apache/ignite/testsuites/IgniteRDDTestSuite.java new file mode 100644 index 0000000..a4177f0 --- /dev/null +++ b/modules/spark/src/test/java/org/apache/ignite/testsuites/IgniteRDDTestSuite.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.spark.JavaEmbeddedIgniteRDDSelfTest; +import org.apache.ignite.spark.JavaStandaloneIgniteRDDSelfTest; + +/** + * Test suit for Ignite RDD + */ +public class IgniteRDDTestSuite extends TestSuite { + /** + * @return Java Ignite RDD test suit. + * @throws Exception If failed. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Java Ignite RDD tests (standalone and embedded modes"); + + suite.addTest(new TestSuite(JavaEmbeddedIgniteRDDSelfTest.class)); + suite.addTest(new TestSuite(JavaStandaloneIgniteRDDSelfTest.class)); + + return suite; + } +} \ No newline at end of file
