There is already a discussion on user list regarding this [1]. Let's continue there as this is not for the dev list. I will try to respond there shortly.
[1] http://apache-ignite-users.70518.x6.nabble.com/Embedded-mode-ignite-on-spark-td6942.html -Val On Sun, Aug 14, 2016 at 6:24 PM, percent620 <[email protected]> wrote: > I will describe my issues detailed as below. I have the same code on two > scenarios but first one is correct and second one is not correct. > > I'm studying ignite recent days but can't get correct result on > this....Hopefully anyone can help me on this. > > ============================== > 1、Run ignite with spark-shell > 1)./spark-shell --jars > /u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-core-1. > 6.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ > ignite-spark/ignite-spark-1.6.0.jar,/u01/xxx/apache-ignite- > hadoop-1.6.0-bin/libs/cache-api-1.0.0.jar,/u01/xxx/apache- > ignite-hadoop-1.6.0-bin/libs/ignite-log4j/ignite-log4j-1.6. > 0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite- > log4j/log4j-1.2.17.jar > --packages > org.apache.ignite:ignite-spark:1.6.0,org.apache.ignite:ignite-spring:1.6.0 > > 2)running the following code on spark-shell > val ic = new IgniteContext[Int, Int](sc, () => new > IgniteConfiguration(),false) > val sharedRDD = ic.fromCache("sharedBaselineCacheRDD") > val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i)) > println("initalRDD.counter=/. " + initalRDD.count() +"\t > partitionCounter=> " + initalRDD.partitions.size) > > //sharedRDD.saveValues(initalRDD.map(line=>line._1)) > sharedRDD.savePairs(initalRDD, true)//override cache on ignite > println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t > igniteParitionCounter => " + sharedRDD.partitions.size) > println("=====>totalIgniteFilterConditionEmbedCounter" + > sharedRDD.filter(_._2 > 50000).count) > > 3)result as below > scala> import org.apache.ignite.spark._ > import org.apache.ignite.spark._ > > scala> import org.apache.ignite.configuration._ > import org.apache.ignite.configuration._ > > scala> val ic = new IgniteContext[Int, Int](sc, () => new > IgniteConfiguration(),false) > ic: org.apache.ignite.spark.IgniteContext[Int,Int] = > org.apache.ignite.spark.IgniteContext@74e72ff4 > > scala> val sharedRDD = ic.fromCache("sharedBaselineCacheRDD") > sharedRDD: org.apache.ignite.spark.IgniteRDD[Int,Int] = IgniteRDD[1] at > RDD > at IgniteAbstractRDD.scala:31 > > scala> val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i)) > initalRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at > map > at <console>:33 > > scala> println("initalRDD.counter=/. " + initalRDD.count() +"\t > partitionCounter=> " + initalRDD.partitions.size) > initalRDD.counter=/. 100000 partitionCounter=> 10 > > scala> sharedRDD.savePairs(initalRDD, true)//override cache on ignite > > scala> println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t > igniteParitionCounter => " + sharedRDD.partitions.size) > =====>totalIgniteEmbedCounter100000 igniteParitionCounter => 1024 > > scala> println("=====>totalIgniteFilterConditionEmbedCounter" + > sharedRDD.filter(_._2 > 50000).count) > =====>totalIgniteFilterConditionEmbedCounter50000 > > *totalIgniteEmbedCounter is :100000 ,right * > *totalIgniteFilterConditionEmbedCounteris :50000, right * > ============================== > > > 2、IDEA project > 1)create a maven project on idea > 2) import ignite maven files as above [1] > <dependency> > <groupId>org.apache.ignite</groupId> > <artifactId>ignite-core</artifactId> > <version>1.6.0</version> > </dependency> > <dependency> > <groupId>org.apache.ignite</groupId> > <artifactId>ignite-indexing</artifactId> > <version>1.6.0</version> > </dependency> > <dependency> > <groupId>org.apache.ignite</groupId> > <artifactId>ignite-visor-console</artifactId> > <version>1.6.0</version> > </dependency> > <dependency> > <groupId>org.apache.ignite</groupId> > <artifactId>ignite-spring</artifactId> > <version>1.6.0</version> > </dependency> > <dependency> > <groupId>org.apache.ignite</groupId> > <artifactId>ignite-spark</artifactId> > <version>1.6.0</version> > </dependency> > <dependency> > <groupId>org.apache.ignite</groupId> > <artifactId>ignite-yarn</artifactId> > <version>1.6.0</version> > </dependency> > 3)code as below for idea > object TestIgniteEmbedCache { > def main(args: Array[String]) { > val conf = new SparkConf().setAppName("TestIgniteEmbedCache") > val sc = new SparkContext(conf) > > //val ic = new IgniteContext[Int, Int](sc, () => new > IgniteConfiguration().setIncludeEventTypes(EventType.EVT_TASK_FAILED), > false) > val ic = new IgniteContext[Int, Int](sc, () => new > IgniteConfiguration(),false) > val sharedRDD = ic.fromCache("sharedBaselineCacheRDD") > val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i)) > println("initalRDD.counter=/. " + initalRDD.count() +"\t > partitionCounter=> " + initalRDD.partitions.size) > > //sharedRDD.saveValues(initalRDD.map(line=>line._1)) > sharedRDD.savePairs(initalRDD, true)//override cache on ignite > println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t > igniteParitionCounter => " + sharedRDD.partitions.size) > println("=====>totalIgniteFilterConditionEmbedCounter" + > sharedRDD.filter(_._2 > 50000).count) > > } > > } > 4、running maven clean assembly:assembly and get sparkignitedemo.jar > > 5、upload this jar to our linux driver machine and submit jar to yarn > cluster > using spark-submit command as below > > /u01/spark-1.6.0-hive/bin/spark-submit --driver-memory 8G --class > com.TestIgniteEmbedCache --master yarn --executor-cores 5 --executor-memory > 1000m --num-executors 10 --conf spark.rdd.compress=false --conf > spark.shuffle.compress=false --conf spark.broadcast.compress=false > /home/sparkignitedemo.jar > > > 6、result: this is issue on this > *totalIgniteEmbedCounter is : 40000 or 3000(I think is random) * > *totalIgniteFilterConditionEmbedCounteris :10000 or 2000(random) * > ========================== > > This result is very make me to be confused on this why the same code have > two different result on this? Can anyone help me on this? I'm blocking this > issue on several days. > > Thanks!!! > > > > -- > View this message in context: http://apache-ignite- > developers.2346864.n4.nabble.com/Embedded-mode-ignite-on- > spark-for-cache-data-lost-issues-tp10685.html > Sent from the Apache Ignite Developers mailing list archive at Nabble.com. >
