#IGNITE-389 - Changing API
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7439b5b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7439b5b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7439b5b7 Branch: refs/heads/ignite-389 Commit: 7439b5b7ddb5f6f771c927a2a6f5bdb8b3405392 Parents: 520cd03 Author: Alexey Goncharuk <[email protected]> Authored: Fri May 22 18:49:33 2015 -0700 Committer: Alexey Goncharuk <[email protected]> Committed: Fri May 22 18:49:33 2015 -0700 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 1 - .../processors/cache/GridCacheProcessor.java | 30 ++++-- .../cache/IgniteDynamicCacheStartSelfTest.java | 22 ++-- .../apache/ignite/spark/IgniteAbstractRDD.scala | 38 +++++++ .../org/apache/ignite/spark/IgniteContext.scala | 97 ++--------------- .../org/apache/ignite/spark/IgniteRDD.scala | 106 ++++++++++++++++--- .../spark/examples/IgniteProcessExample.scala | 13 ++- .../spark/examples/IgniteStoreExample.scala | 4 +- .../ignite/spark/impl/IgniteQueryIterator.scala | 17 ++- .../apache/ignite/spark/impl/IgniteSqlRDD.scala | 43 ++++++++ .../spark/util/SerializablePredicate2.scala | 32 ------ 11 files changed, 232 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index df6b2ee..692f1e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -774,7 +774,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { * * @param loadPrevVal Load previous value flag. * @return {@code this} for chaining. - * @return {@code this} for chaining. */ public CacheConfiguration setLoadPreviousValue(boolean loadPrevVal) { this.loadPrevVal = loadPrevVal; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 0e1a9c2..3c4c7d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1931,7 +1931,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.cacheType(cacheType); - return F.first(initiateCacheChanges(F.asList(req))); + return F.first(initiateCacheChanges(F.asList(req), failIfExists)); } /** @@ -1941,14 +1941,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { public IgniteInternalFuture<?> dynamicStopCache(String cacheName) { DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId(), true); - return F.first(initiateCacheChanges(F.asList(t))); + return F.first(initiateCacheChanges(F.asList(t), false)); } /** * @param reqs Requests. * @return Collection of futures. */ - public Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest> reqs) { + @SuppressWarnings("TypeMayBeWeakened") + private Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest> reqs, + boolean failIfExists) { Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size()); Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size()); @@ -1981,9 +1983,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { maskNull(req.cacheName()), fut); if (old != null) { - if (req.start() && !req.clientStartOnly()) { - fut.onDone(new CacheExistsException("Failed to start cache " + - "(a cache with the same name is already being started or stopped): " + req.cacheName())); + if (req.start()) { + if (!req.clientStartOnly()) { + if (failIfExists) + fut.onDone(new CacheExistsException("Failed to start cache " + + "(a cache with the same name is already being started or stopped): " + + req.cacheName())); + else { + fut = old; + + continue; + } + } + else { + fut = old; + + continue; + } } else { fut = old; @@ -2617,7 +2633,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.clientStartOnly(true); - F.first(initiateCacheChanges(F.asList(req))).get(); + F.first(initiateCacheChanges(F.asList(req), false)).get(); IgniteCache cache = jCacheProxies.get(masked); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index adece63..698ee03 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -793,23 +793,19 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testGetOrCreateMultiNodeTemplate() throws Exception { - for (int i = 0; i < 100; i++) { - info(">>> Iteration " + i); + final AtomicInteger idx = new AtomicInteger(); - final AtomicInteger idx = new AtomicInteger(); - - GridTestUtils.runMultiThreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - int idx0 = idx.getAndIncrement(); + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + int idx0 = idx.getAndIncrement(); - ignite(idx0 % nodeCount()).getOrCreateCache(DYNAMIC_CACHE_NAME); + ignite(idx0 % nodeCount()).getOrCreateCache(DYNAMIC_CACHE_NAME); - return null; - } - }, nodeCount() * 4, "runner"); + return null; + } + }, nodeCount() * 4, "runner"); - ignite(0).destroyCache(DYNAMIC_CACHE_NAME); - } + ignite(0).destroyCache(DYNAMIC_CACHE_NAME); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteAbstractRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteAbstractRDD.scala new file mode 100644 index 0000000..63232be --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteAbstractRDD.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.IgniteCache +import org.apache.ignite.configuration.CacheConfiguration +import org.apache.spark.rdd.RDD + +import scala.reflect.ClassTag + +abstract class IgniteAbstractRDD[R:ClassTag, K, V] ( + ic: IgniteContext[K, V], + cacheName: String, + cacheCfg: CacheConfiguration[K, V] +) extends RDD[R] (ic.sparkContext, deps = Nil) { + protected def ensureCache(): IgniteCache[K, V] = { + // Make sure to deploy the cache + if (cacheCfg != null) + ic.ignite().getOrCreateCache(cacheCfg) + else + ic.ignite().getOrCreateCache(cacheName) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index 56d2a05..9d9f9a7 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -17,98 +17,29 @@ package org.apache.ignite.spark -import javax.cache.Cache -import org.apache.ignite.cache.query.{Query, ScanQuery} -import org.apache.ignite.cluster.ClusterNode import org.apache.ignite.internal.IgnitionEx -import org.apache.ignite.lang.IgniteUuid -import org.apache.ignite.spark.util.SerializablePredicate2 -import org.apache.ignite.{Ignition, IgniteCache, Ignite} +import org.apache.ignite.{Ignition, Ignite} import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag class IgniteContext[K, V]( - @scala.transient sc: SparkContext, - cfgF: () => IgniteConfiguration, - val cacheName: String, - cacheCfg: CacheConfiguration[K, V] + @scala.transient val sparkContext: SparkContext, + cfgF: () => IgniteConfiguration ) extends Serializable { - type ScanRDD[K1, V1] = IgniteRDD[Cache.Entry[K1, V1], K1, V1] - - def this( - sc: SparkContext, - springUrl: String, - cacheName: String - ) { - this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1(), cacheName, null) - } - - def this( - sc: SparkContext, - cfgF: () => IgniteConfiguration, - cacheName: String - ) { - this(sc, cfgF, cacheName, null) - } - - def this( - sc: SparkContext, - cfgF: () => IgniteConfiguration, - cacheCfg: CacheConfiguration[K, V] - ) { - this(sc, cfgF, cacheCfg.getName, cacheCfg) - } - def this( sc: SparkContext, - springUrl: String, - cacheCfg: CacheConfiguration[K, V] + springUrl: String ) { - this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1(), cacheCfg.getName, cacheCfg) + this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1()) } - def sparkContext() = sc - - def scan(p: (K, V) => Boolean = (_, _) => true): ScanRDD[K, V] = { - new ScanRDD(this, new ScanQuery[K, V](new SerializablePredicate2[K, V](p))) + def fromCache(cacheName: String): IgniteRDD[K, V] = { + new IgniteRDD[K, V](this, cacheName, null) } - def scan[R:ClassTag](qry: Query[R]): IgniteRDD[R, K, V] = { - new IgniteRDD[R, K, V](this, qry) - } - - def saveToIgnite[T](rdd: RDD[V], keyFunc: (IgniteContext[K, V], V, ClusterNode) => T = affinityKeyFunc(_: IgniteContext[K, V], _:V, _: ClusterNode)) = { - rdd.foreachPartition(it => { - println("Using scala version: " + scala.util.Properties.versionString) - // Make sure to deploy the cache - igniteCache() - - val ig = ignite() - - val locNode = ig.cluster().localNode() - - val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode)) - - val streamer = ignite().dataStreamer[T, V](cacheName) - - try { - it.foreach(value => { - val key: T = keyFunc(this, value, node.orNull) - - println("Saving: " + key + ", " + value) - - streamer.addData(key, value) - }) - } - finally { - streamer.close() - } - }) + def fromCache(cacheCfg: CacheConfiguration[K, V]) = { + new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg) } def ignite(): Ignite = { @@ -130,14 +61,4 @@ class IgniteContext[K, V]( } } - private def igniteCache(): IgniteCache[K, V] = { - if (cacheCfg == null) - ignite().getOrCreateCache(cacheName) - else - ignite().getOrCreateCache(cacheCfg) - } - - private def affinityKeyFunc(ic: IgniteContext[K, V], value: V, node: ClusterNode): Object = { - IgniteUuid.randomUuid() - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 4018c53..ce51e9c 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -14,32 +14,114 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.ignite.spark -import org.apache.ignite.cache.query.Query -import org.apache.ignite.spark.impl.{IgniteQueryIterator, IgnitePartition} -import org.apache.spark.{TaskContext, Partition} +import javax.cache.Cache + +import org.apache.ignite.cache.query.{SqlQuery, ScanQuery} +import org.apache.ignite.cluster.ClusterNode +import org.apache.ignite.configuration.CacheConfiguration +import org.apache.ignite.lang.IgniteUuid +import org.apache.ignite.spark.impl.{IgniteSqlRDD, IgnitePartition, IgniteQueryIterator} import org.apache.spark.rdd.RDD +import org.apache.spark.{TaskContext, Partition} import scala.collection.JavaConversions._ -import scala.reflect.ClassTag -class IgniteRDD[R:ClassTag, K, V]( +class IgniteRDD[K, V] ( ic: IgniteContext[K, V], - qry: Query[R] -) extends RDD[R] (ic.sparkContext(), deps = Nil) { - override def compute(part: Partition, context: TaskContext): Iterator[R] = { - new IgniteQueryIterator[R, K, V](ic, part, qry) + cacheName: String, + cacheCfg: CacheConfiguration[K, V] +) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) { + + override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { + val cache = ensureCache() + + val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(new ScanQuery[K, V]()).iterator() + + new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry => { + (entry.getKey, entry.getValue) + }) } override protected def getPartitions: Array[Partition] = { - val parts = ic.ignite().affinity(ic.cacheName).partitions() + ensureCache() + + val parts = ic.ignite().affinity(cacheName).partitions() (0 until parts).map(new IgnitePartition(_)).toArray } override protected def getPreferredLocations(split: Partition): Seq[String] = { - ic.ignite().affinity(ic.cacheName).mapPartitionToPrimaryAndBackups(split.index).map(_.addresses()).flatten.toList + ensureCache() + + ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index).map(_.addresses()).flatten.toList + } + + def query(typeName: String, sql: String, args: Any*): RDD[(K, V)] = { + val qry: SqlQuery[K, V] = new SqlQuery[K, V](typeName, sql) + + qry.setArgs(args) + + new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry => (entry.getKey, entry.getValue)) + } + + def saveValues(rdd: RDD[V]) = { + rdd.foreachPartition(it => { + println("Using scala version: " + scala.util.Properties.versionString) + val ig = ic.ignite() + + ensureCache() + + val locNode = ig.cluster().localNode() + + val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode)) + + val streamer = ig.dataStreamer[Object, V](cacheName) + + try { + it.foreach(value => { + val key = affinityKeyFunc(value, node.orNull) + + println("Saving: " + key + ", " + value) + + streamer.addData(key, value) + }) + } + finally { + streamer.close() + } + }) + } + + def save(rdd: RDD[(K, V)]) = { + rdd.foreachPartition(it => { + println("Using scala version: " + scala.util.Properties.versionString) + val ig = ic.ignite() + + // Make sure to deploy the cache + ensureCache() + + val locNode = ig.cluster().localNode() + + val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode)) + + val streamer = ig.dataStreamer[K, V](cacheName) + + try { + it.foreach(tup => { + println("Saving: " + tup._1 + ", " + tup._2) + + streamer.addData(tup._1, tup._2) + }) + } + finally { + streamer.close() + } + }) + } + + private def affinityKeyFunc(value: V, node: ClusterNode): Object = { + IgniteUuid.randomUuid() } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala index 3932a26..cdb41d2 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala @@ -25,20 +25,23 @@ object IgniteProcessExample { val conf = new SparkConf().setAppName("Ignite processing example") val sc = new SparkContext(conf) - val partitioned = new IgniteContext[Object, String](sc, ExampleConfiguration.configuration _, "partitioned") + val ignite = new IgniteContext[Object, String](sc, ExampleConfiguration.configuration _) // Search for lines containing "Ignite". - val scanRdd = partitioned.scan((k, v) => v.contains("Ignite")) + val scanRdd = ignite.fromCache("partitioned") val processedRdd = scanRdd.filter(line => { println("Analyzing line: " + line) + line._2.contains("Ignite") true - }).map(_.getValue) + }).map(_._2) // Create a new cache for results. - val results = new IgniteContext[Object, String](sc, ExampleConfiguration.configuration _, "results") + val results = ignite.fromCache("results") - results.saveToIgnite(processedRdd) + results.saveValues(processedRdd) + + ignite.fromCache("indexed").query("Person", "age > ?", 20).collect() } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala index a7823f4..c74804e 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala @@ -27,7 +27,7 @@ object IgniteStoreExample { val conf = new SparkConf().setAppName("Ignite store example") val sc = new SparkContext(conf) - val ignite = new IgniteContext[String, String](sc, ExampleConfiguration.configuration _, "partitioned") + val ignite = new IgniteContext[String, String](sc, ExampleConfiguration.configuration _) val lines: RDD[String] = sc.textFile(args(0)).filter(line => { println("Read line: " + line) @@ -35,6 +35,6 @@ object IgniteStoreExample { true }) - ignite.saveToIgnite(lines) + ignite.fromCache("partitioned").saveValues(lines) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala index 07b24a9..b24ba50 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala @@ -17,16 +17,11 @@ package org.apache.ignite.spark.impl -import org.apache.ignite.cache.query.Query -import org.apache.ignite.spark.IgniteContext -import org.apache.spark.Partition +class IgniteQueryIterator[T, R] ( + cur: java.util.Iterator[T], + conv: (T) => R +) extends Iterator[R] { + override def hasNext: Boolean = cur.hasNext -class IgniteQueryIterator[R, K, V] ( - ic: IgniteContext[K, V], - part: Partition, - qry: Query[R] - ) extends Iterator[R] { - override def hasNext: Boolean = ??? - - override def next(): R = ??? + override def next(): R = conv(cur.next()) } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala new file mode 100644 index 0000000..e347c85 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +import org.apache.ignite.cache.query.Query +import org.apache.ignite.configuration.CacheConfiguration +import org.apache.ignite.spark.{IgniteAbstractRDD, IgniteContext} +import org.apache.spark.{TaskContext, Partition} + +import scala.reflect.ClassTag + +class IgniteSqlRDD[R: ClassTag, T, K, V]( + ic: IgniteContext[K, V], + cacheName: String, + cacheCfg: CacheConfiguration[K, V], + qry: Query[T], + conv: (T) => R +) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg) { + override def compute(split: Partition, context: TaskContext): Iterator[R] = { + val it: java.util.Iterator[T] = ensureCache().query(qry).iterator() + + new IgniteQueryIterator[T, R](it, conv) + } + + override protected def getPartitions: Array[Partition] = { + Array(new IgnitePartition(0)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala deleted file mode 100644 index 484d0df..0000000 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark.util - -import org.apache.ignite.lang.IgniteBiPredicate - -/** - * Peer deploy aware adapter for Java's `GridPredicate2`. - */ -class SerializablePredicate2[T1, T2](private val p: (T1, T2) => Boolean) extends IgniteBiPredicate[T1, T2] { - assert(p != null) - - /** - * Delegates to passed in function. - */ - def apply(e1: T1, e2: T2) = p(e1, e2) -} \ No newline at end of file
