How did you run this? I couldn't run your query with 4G of RAM in 1.4, but in 1.5 it ran.
Also I recommend just dumping the data to parquet on disk to evaluate, rather than using the in-memory cache, which is super slow and we are thinking of removing/replacing with something else. val size = 100000000 val partitions = 10 val repetitions = 5 val data = sc.parallelize(1 to size, partitions).map(x => (util.Random.nextInt(size / repetitions), util.Random.nextDouble)).toDF("key", "value") data.write.parquet("/scratch/rxin/tmp/alex") val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex") val t = System.nanoTime() val res = df.groupBy("key").agg(sum("value")) res.count() println((System.nanoTime() - t) / 1e9) On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <alexander.ula...@hp.com> wrote: > Dear Spark developers, > > > > I am trying to benchmark the new Dataframe aggregation implemented under > the project Tungsten and released with Spark 1.4 (I am using the latest > Spark from the repo, i.e. 1.5): > > https://github.com/apache/spark/pull/5725 > > It tells that the aggregation should be faster due to using the unsafe to > allocate memory and in-place update. It was also presented on Spark Summit > this Summer: > > > http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen > > The following enables the new aggregation in spark-config: > > spark.sql.unsafe.enabled=true > > spark.unsafe.offHeap=true > > > > I wrote a simple code that does aggregation of values by keys. However, > the time needed to execute the code does not depend if the new aggregation > is on or off. Could you suggest how can I observe the improvement that the > aggregation provides? Could you write a code snippet that takes advantage > of the new aggregation? > > > > case class Counter(key: Int, value: Double) > > val size = 100000000 > > val partitions = 5 > > val repetitions = 5 > > val data = sc.parallelize(1 to size, partitions).map(x => > Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble)) > > val df = sqlContext.createDataFrame(data) > > df.persist() > > df.count() > > val t = System.nanoTime() > > val res = df.groupBy("key").agg(sum("value")) > > res.count() > > println((System.nanoTime() - t) / 1e9) > > > > > > Best regards, Alexander >