Not sure what's going on or how you measure the time, but the difference here is pretty big when I test on my laptop. Maybe you set the wrong config variables? (spark.sql.* are sql variables that you set in sqlContext.setConf -- and in 1.5, they are consolidated into a single flag: spark.sql.tungsten.enabled. See below.
I ran with a 10m dataset (created by calling sample(true, 0.1) on the 100m dataset), since the 100m one takes too long when tungsten is off on my laptop so I didn't wait. (40s - 50s with Tungsten on) val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex-10m") val t = System.nanoTime() df.groupBy("key").sum("value").queryExecution.toRdd.count() println((System.nanoTime() - t) / 1e9) On 1.5, with 8g driver memory and 8 cores: 5.48951 sqlContext.setConf("spark.sql.tungsten.enabled", "false") run it again, and took 25.127962. On 1.4, with 8g driver memory and 8 cores: 25.583473 It's also possible that the benefit is less when you have infinite amount of memory (relative to the tiny dataset size) and as a result GC happens less. On Thu, Aug 20, 2015 at 7:00 PM, Ulanov, Alexander <alexander.ula...@hp.com> wrote: > Did git pull :) > > > > Now I do get the difference in time between on/off Tungsten unsafe: it is > 24-25 seconds (unsafe on) vs 32-26 seconds (unsafe off) for the example > below. > > > > Why I am not getting the improvement as advertised on Spark Summit (slide > 23)? > > > http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen > > > > My dataset is 100M rows, is it big enough to get the improvement? Do I use > aggregate correctly? > > > > > > case class Counter(key: Int, value: Double) > > val size = 100000000 > > val partitions = 5 > > val repetitions = 5 > > val data = sc.parallelize(1 to size, partitions).map(x => > Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble)) > > val df = sqlContext.createDataFrame(data) > > df.persist() > > df.foreach { x => {} } > > val t = System.nanoTime() > > val res = df.groupBy("key").agg(sum("value")) > > res.foreach { x => {} } > > println((System.nanoTime() - t) / 1e9) > > > > Unsafe on: > > spark.sql.codegen true > > spark.sql.unsafe.enabled true > > spark.unsafe.offHeap true > > > > Unsafe off: > > spark.sql.codegen false > > spark.sql.unsafe.enabled false > > spark.unsafe.offHeap false > > > > *From:* Reynold Xin [mailto:r...@databricks.com] > *Sent:* Thursday, August 20, 2015 5:43 PM > > *To:* Ulanov, Alexander > *Cc:* dev@spark.apache.org > *Subject:* Re: Dataframe aggregation with Tungsten unsafe > > > > Please git pull :) > > > > > > On Thu, Aug 20, 2015 at 5:35 PM, Ulanov, Alexander < > alexander.ula...@hp.com> wrote: > > I am using Spark 1.5 cloned from master on June 12. (The aggregate unsafe > feature was added to Spark on April 29.) > > > > *From:* Reynold Xin [mailto:r...@databricks.com] > *Sent:* Thursday, August 20, 2015 5:26 PM > > > *To:* Ulanov, Alexander > *Cc:* dev@spark.apache.org > *Subject:* Re: Dataframe aggregation with Tungsten unsafe > > > > Yes - DataFrame and SQL are the same thing. > > > > Which version are you running? Spark 1.4 doesn't run Janino --- but you > have a Janino exception? > > > > On Thu, Aug 20, 2015 at 5:01 PM, Ulanov, Alexander < > alexander.ula...@hp.com> wrote: > > When I add the following option: > > spark.sql.codegen true > > > > Spark crashed on the “df.count” with concurrentException (below). Are you > sure that I need to set this flag to get unsafe? It looks like SQL flag, > and I don’t use sql. > > > > > > java.util.concurrent.ExecutionException: > org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override > > at > org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) > > at > org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) > > at > org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) > > at > org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) > > at > org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) > > at > org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) > > at > org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) > > at > org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) > > at > org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000) > > at > org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) > > at > org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) > > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286) > > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283) > > at > org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180) > > at > org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277) > > at > org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276) > > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) > > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > > at org.apache.spark.scheduler.Task.run(Task.scala:70) > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column > 10: Override > > at > org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897) > > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331) > > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207) > > at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188) > > at > org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185) > > at > org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119) > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880) > > at > org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159) > > at > org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830) > > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814) > > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794) > > at > org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507) > > at > org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658) > > at > org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662) > > at > org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185) > > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350) > > at > org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035) > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354) > > at > org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769) > > at > org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532) > > at > org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393) > > at > org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185) > > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347) > > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139) > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354) > > at > org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322) > > at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383) > > at > org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315) > > at > org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233) > > at > org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192) > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84) > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77) > > at > org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72) > > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246) > > at > org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64) > > at > org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32) > > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273) > > at > org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) > > at > org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) > > ... 28 more > > Caused by: java.lang.ClassNotFoundException: Override > > at > org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > > at java.lang.Class.forName0(Native Method) > > at java.lang.Class.forName(Class.java:270) > > at > org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78) > > at > org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254) > > at > org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893) > > ... 66 more > > Caused by: java.lang.ClassNotFoundException: Override > > at java.lang.ClassLoader.findClass(ClassLoader.java:531) > > at > org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > > at > org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > > at > org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30) > > at > org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64) > > ... 73 more > > > > > > *From:* Reynold Xin [mailto:r...@databricks.com] > *Sent:* Thursday, August 20, 2015 4:22 PM > *To:* Ulanov, Alexander > *Cc:* dev@spark.apache.org > *Subject:* Re: Dataframe aggregation with Tungsten unsafe > > > > I think you might need to turn codegen on also in order for the unsafe > stuff to work. > > > > > > On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander < > alexander.ula...@hp.com> wrote: > > Hi Reynold, > > Thank you for suggestion. This code takes around 30 sec on my setup (5 > workers with 32GB). My issue is that I don't see the change in time if I > unset the unsafe flags. Could you explain why it might happen? > > 20 авг. 2015 г., в 15:32, Reynold Xin <r...@databricks.com<mailto: > r...@databricks.com>> написал(а): > > I didn't wait long enough earlier. Actually it did finish when I raised > memory to 8g. > > In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe > flags), the query took 40s with 4G of mem. > > In 1.4, it took 195s with 8G of mem. > > This is not a scientific benchmark and I only ran it once. > > > > On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <r...@databricks.com<mailto: > r...@databricks.com>> wrote: > How did you run this? I couldn't run your query with 4G of RAM in 1.4, but > in 1.5 it ran. > > Also I recommend just dumping the data to parquet on disk to evaluate, > rather than using the in-memory cache, which is super slow and we are > thinking of removing/replacing with something else. > > > val size = 100000000 > val partitions = 10 > val repetitions = 5 > val data = sc.parallelize(1 to size, partitions).map(x => > (util.Random.nextInt(size / repetitions), > util.Random.nextDouble)).toDF("key", "value") > > data.write.parquet("/scratch/rxin/tmp/alex") > > > val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex") > val t = System.nanoTime() > val res = df.groupBy("key").agg(sum("value")) > res.count() > println((System.nanoTime() - t) / 1e9) > > On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander < > alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: > Dear Spark developers, > > I am trying to benchmark the new Dataframe aggregation implemented under > the project Tungsten and released with Spark 1.4 (I am using the latest > Spark from the repo, i.e. 1.5): > https://github.com/apache/spark/pull/5725 > It tells that the aggregation should be faster due to using the unsafe to > allocate memory and in-place update. It was also presented on Spark Summit > this Summer: > > http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen > The following enables the new aggregation in spark-config: > spark.sql.unsafe.enabled=true > spark.unsafe.offHeap=true > > I wrote a simple code that does aggregation of values by keys. However, > the time needed to execute the code does not depend if the new aggregation > is on or off. Could you suggest how can I observe the improvement that the > aggregation provides? Could you write a code snippet that takes advantage > of the new aggregation? > > case class Counter(key: Int, value: Double) > val size = 100000000 > val partitions = 5 > val repetitions = 5 > val data = sc.parallelize(1 to size, partitions).map(x => > Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble)) > val df = sqlContext.createDataFrame(data) > df.persist() > df.count() > val t = System.nanoTime() > val res = df.groupBy("key").agg(sum("value")) > res.count() > println((System.nanoTime() - t) / 1e9) > > > Best regards, Alexander > > > > > > >