Re: GraphX: Get edges for a vertex

2014-11-13 Thread Takeshi Yamamuro
Hi,

I think that there are two solutions;

1. Invalid edges send Iterator.empty messages in sendMsg of the Pregel API.
These messages make no effect on corresponding vertices.

2. Use GraphOps.(collectNeighbors/collectNeighborIds), not the Pregel API
 so as to
handle active edge lists by yourself.
I think that it is hard to handle  edge active lists in Pregel APIs.

Thought?

Best regards,
takeshi

On Fri, Nov 14, 2014 at 7:32 AM, Daniil Osipov daniil.osi...@shazam.com
wrote:

 Hello,

 I'm attempting to implement a clustering algorithm on top of Pregel
 implementation in GraphX, however I'm hitting a wall. Ideally, I'd like to
 be able to get all edges for a specific vertex, since they factor into the
 calculation. My understanding was that sendMsg function would receive all
 relevant edges in participating vertices (all initially, declining as they
 converge and stop changing state), and I was planning to keep vertex edges
 associated to each vertex and propagate to other vertices that need to know
 about these edges.

 What I'm finding is that not all edges get iterated on by sendMsg before
 sending messages to vprog. Even if I try to keep track of edges, I don't
 account all of them, leading to incorrect results.

 The graph I'm testing on has edges between all nodes, one for each
 direction, and I'm using EdgeDirection.Both.

 Anyone seen something similar, and have some suggestions?
 Dan



Re: Learning GraphX Questions

2015-02-19 Thread Takeshi Yamamuro
Hi,

Vertices are simply hash-partitioned by spark.HashPartitioner, so
you easily calculate partition ids by yourself.

Also, you can type the lines to check ids;

import org.apache.spark.graphx._

graph.vertices.mapPartitionsWithIndex { (pid, iter) =
  val vids = Array.newBuilder[VertexId]
  for (d - iter) vids += d._1
  Iterator((pid, vids.result))
}
.map(d = sPID:${d._1} IDs:${d._2.toSeq.toString})
.collect
.foreach(println)








On Thu, Feb 19, 2015 at 12:31 AM, Matthew Bucci mrbucci...@gmail.com
wrote:

 Thanks for all the responses so far! I have started to understand the
 system more, but I just had another question while I was going along. Is
 there a way to check the individual partitions of an RDD? For example, if I
 had a graph with vertices a,b,c,d and it was split into 2 partitions could
 I check which vertices belonged in partition 1 and parition 2?

 Thank You,
 Matthew Bucci

 On Fri, Feb 13, 2015 at 10:58 PM, Ankur Dave ankurd...@gmail.com wrote:

 At 2015-02-13 12:19:46 -0800, Matthew Bucci mrbucci...@gmail.com wrote:
  1) How do you actually run programs in GraphX? At the moment I've been
 doing
  everything live through the shell, but I'd obviously like to be able to
 work
  on it by writing and running scripts.

 You can create your own projects that build against Spark and GraphX
 through a Maven dependency [1], then run those applications using the
 bin/spark-submit script included with Spark [2].

 These guides assume you already know how to do this using your preferred
 build tool (SBT or Maven). In short, here's how to do it with SBT:

 1. Install SBT locally (`brew install sbt` on OS X).

 2. Inside your project directory, create a build.sbt file listing Spark
 and GraphX as a dependency, as in [3].

 3. Run `sbt package` in a shell.

 4. Pass the JAR in your_project_dir/target/scala-2.10/ to
 bin/spark-submit.

 [1]
 http://spark.apache.org/docs/latest/programming-guide.html#linking-with-spark
 [2] http://spark.apache.org/docs/latest/submitting-applications.html
 [3] https://gist.github.com/ankurdave/1fb7234d8affb3a2e4f4

  2) Is there a way to check the status of the partitions of a graph? For
  example, I want to determine for starters if the number of partitions
  requested are always made, like if I ask for 8 partitions but only have
 4
  cores what happens?

 You can look at `graph.vertices` and `graph.edges`, which are both RDDs,
 so you can do for example: graph.vertices.partitions

  3) Would I be able to partition by vertex instead of edges, even if I
 had to
  write it myself? I know partitioning by edges is favored in a majority
 of
  the cases, but for the sake of research I'd like to be able to do both.

 If you pass PartitionStrategy.EdgePartition1D, this will partition edges
 by their source vertices, so all edges with the same source will be
 co-partitioned, and the communication pattern will be similar to
 vertex-partitioned (edge-cut) systems like Giraph.

  4) Is there a better way to time processes outside of using built-in
 unix
  timing through the logs or something?

 I think the options are Unix timing, log file timestamp parsing, looking
 at the web UI, or writing timing code within your program
 (System.currentTimeMillis and System.nanoTime).

 Ankur





-- 
---
Takeshi Yamamuro


Re: [GraphX] Excessive value recalculations during aggregateMessages cycles

2015-02-15 Thread Takeshi Yamamuro
 to append the
 cycle number (the second argument) into a log file named after the vertex.
 What ends up getting dumped into the log file for every vertex (in the
 exact same pattern) is
 ```
 Cycle: 0
 Cycle: 1
 Cycle: 0
 Cycle: 2
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 3
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 2
 Cycle: 4
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 2
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 2
 Cycle: 3
 Cycle: 5
 ```

 Any ideas about what I might be doing wrong for the caching? And how I
 can avoid re-calculating so many of the values.


 Kyle






-- 
---
Takeshi Yamamuro


Re: GraphX Snapshot Partitioning

2015-03-14 Thread Takeshi Yamamuro
Large edge partitions could cause java.lang.OutOfMemoryError, and then
spark tasks fails.

FWIW, each edge partition can have at most 2^32 edges because 64-bit vertex
IDs are
mapped into 32-bit ones in each partitions.
If #edges is over the limit, graphx could throw
ArrayIndexOutOfBoundsException,
or something. So, each partition can have more edges than you expect.





On Wed, Mar 11, 2015 at 11:42 PM, Matthew Bucci mrbucci...@gmail.com
wrote:

 Hi,

 Thanks for the response! That answered some questions I had, but the last
 one I was wondering is what happens if you run a partition strategy and one
 of the partitions ends up being too large? For example, let's say
 partitions can hold 64MB (actually knowing the maximum possible size of a
 partition would probably also be helpful to me). You try to partition the
 edges of a graph to 3 separate partitions but the edges in the first
 partition end up being 80MB worth of edges so it cannot all fit in the
 first partition . Would the extra 16MB flood over into a new 4th partition
 or would the system try to split it so that the 1st and 4th partition are
 both at 40MB, or would the partition strategy just fail with a memory
 error?

 Thank You,
 Matthew Bucci

 On Mon, Mar 9, 2015 at 11:07 PM, Takeshi Yamamuro linguin@gmail.com
 wrote:

 Hi,

 Vertices are simply hash-paritioned by their 64-bit IDs, so
 they are evenly spread over parititons.

 As for edges, GraphLoader#edgeList builds edge paritions
 through hadoopFile(), so the initial parititons depend
 on InputFormat#getSplits implementations
 (e.g, partitions are mostly equal to 64MB blocks for HDFS).

 Edges can be re-partitioned by ParititonStrategy;
 a graph is partitioned considering graph structures and
 a source ID and a destination ID are used as partition keys.
 The partitions might suffer from skewness depending
 on graph properties (hub nodes, or something).

 Thanks,
 takeshi


 On Tue, Mar 10, 2015 at 2:21 AM, Matthew Bucci mrbucci...@gmail.com
 wrote:

 Hello,

 I am working on a project where we want to split graphs of data into
 snapshots across partitions and I was wondering what would happen if one
 of
 the snapshots we had was too large to fit into a single partition. Would
 the
 snapshot be split over the two partitions equally, for example, and how
 is a
 single snapshot spread over multiple partitions?

 Thank You,
 Matthew Bucci



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Snapshot-Partitioning-tp21977.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 ---
 Takeshi Yamamuro





-- 
---
Takeshi Yamamuro


Re: [GRAPHX] could not process graph with 230M edges

2015-03-14 Thread Takeshi Yamamuro
Hi,

If you have heap problems in spark/graphx, it'd be better to split
partitions
into smaller ones so as to fit the partition on memory.

On Sat, Mar 14, 2015 at 12:09 AM, Hlib Mykhailenko 
hlib.mykhaile...@inria.fr wrote:

 Hello,

 I cannot process graph with 230M edges.
 I cloned apache.spark, build it and then tried it on cluster.

 I used Spark Standalone Cluster:
 -5 machines (each has 12 cores/32GB RAM)
 -'spark.executor.memory' ==  25g
 -'spark.driver.memory' == 3g


 Graph has 231359027 edges. And its file weights 4,524,716,369 bytes.
 Graph is represented in text format:
 source vertex id destination vertex id

 My code:

 object Canonical {

   def main(args: Array[String]) {

 val numberOfArguments = 3
 require(args.length == numberOfArguments, sWrong argument number.
 Should be $numberOfArguments .

  |Usage: path_to_grpah partiotioner_name minEdgePartitions
 .stripMargin)

 var graph: Graph[Int, Int] = null
 val nameOfGraph = args(0).substring(args(0).lastIndexOf(/) + 1)
 val partitionerName = args(1)
 val minEdgePartitions = args(2).toInt

 val sc = new SparkContext(new SparkConf()
.setSparkHome(System.getenv(SPARK_HOME))
.setAppName(s partitioning | $nameOfGraph |
 $partitionerName | $minEdgePartitions parts )

  .setJars(SparkContext.jarOfClass(this.getClass).toList))

 graph = GraphLoader.edgeListFile(sc, args(0), false, edgeStorageLevel
 = StorageLevel.MEMORY_AND_DISK,
vertexStorageLevel
 = StorageLevel.MEMORY_AND_DISK, minEdgePartitions = minEdgePartitions)
 graph =
 graph.partitionBy(PartitionStrategy.fromString(partitionerName))
 println(graph.edges.collect.length)
 println(graph.vertices.collect.length)
   }
 }

 After I run it I encountered number of java.lang.OutOfMemoryError: Java
 heap space errors and of course I did not get a result.

 Do I have problem in the code? Or in cluster configuration?

 Because it works fine for relatively small graphs. But for this graph it
 never worked. (And I do not think that 230M edges is too big data)


 Thank you for any advise!



 --
 Cordialement,
 *Hlib Mykhailenko*
 Doctorant à INRIA Sophia-Antipolis Méditerranée
 http://www.inria.fr/centre/sophia/
 2004 Route des Lucioles BP93
 06902 SOPHIA ANTIPOLIS cedex




-- 
---
Takeshi Yamamuro


Re: Basic GraphX deployment and usage question

2015-03-16 Thread Takeshi Yamamuro
Hi,

Your're right, that is, graphx has already be included in a spark default
package.
As a first step, 'Analytics' seems to be suitable for your objective.
# ./bin/run-example graphx.Analytics pagerank graph-file





On Tue, Mar 17, 2015 at 2:21 AM, Khaled Ammar khaled.am...@gmail.com
wrote:

 Hi,

 I'm very new to Spark and GraphX. I downloaded and configured Spark on a
 cluster, which uses Hadoop 1.x. The master UI shows all workers. The
 example command run-example SparkPi works fine and completes
 successfully.

 I'm interested in GraphX. Although the documentation says it is built-in
 with Spark, I could not find any GraphX jar files under lib. I also
 wonder if any of the algorithms mentioned in GraphX programming guide page
 is pre-combiled and available for testing.

 My main objective is to ensure that at least one correct graph application
 is working with no errors using GraphX, before I start writing my own.

 --
 Thanks,
 -Khaled




-- 
---
Takeshi Yamamuro


Re: GraphX Snapshot Partitioning

2015-03-09 Thread Takeshi Yamamuro
Hi,

Vertices are simply hash-paritioned by their 64-bit IDs, so
they are evenly spread over parititons.

As for edges, GraphLoader#edgeList builds edge paritions
through hadoopFile(), so the initial parititons depend
on InputFormat#getSplits implementations
(e.g, partitions are mostly equal to 64MB blocks for HDFS).

Edges can be re-partitioned by ParititonStrategy;
a graph is partitioned considering graph structures and
a source ID and a destination ID are used as partition keys.
The partitions might suffer from skewness depending
on graph properties (hub nodes, or something).

Thanks,
takeshi


On Tue, Mar 10, 2015 at 2:21 AM, Matthew Bucci mrbucci...@gmail.com wrote:

 Hello,

 I am working on a project where we want to split graphs of data into
 snapshots across partitions and I was wondering what would happen if one of
 the snapshots we had was too large to fit into a single partition. Would
 the
 snapshot be split over the two partitions equally, for example, and how is
 a
 single snapshot spread over multiple partitions?

 Thank You,
 Matthew Bucci



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Snapshot-Partitioning-tp21977.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
---
Takeshi Yamamuro


Re: Registering custom UDAFs with HiveConetxt in SparkSQL, how?

2015-03-26 Thread Takeshi Yamamuro
I think it is not `sqlContext` but hiveContext because `create temporary
function` is not supported in SQLContext.

On Wed, Mar 25, 2015 at 5:58 AM, Jon Chase jon.ch...@gmail.com wrote:

 Shahab -

 This should do the trick until Hao's changes are out:


 sqlContext.sql(create temporary function foobar as
 'com.myco.FoobarUDAF');

 sqlContext.sql(select foobar(some_column) from some_table);


 This works without requiring to 'deploy' a JAR with the UDAF in it - just
 make sure the UDAF is in your project's classpath.




 On Tue, Mar 10, 2015 at 8:21 PM, Cheng, Hao hao.ch...@intel.com wrote:

  Oh, sorry, my bad, currently Spark SQL doesn’t provide the user
 interface for UDAF, but it can work seamlessly with Hive UDAF (via
 HiveContext).



 I am also working on the UDAF interface refactoring, after that we can
 provide the custom interface for extension.



 https://github.com/apache/spark/pull/3247





 *From:* shahab [mailto:shahab.mok...@gmail.com]
 *Sent:* Wednesday, March 11, 2015 1:44 AM
 *To:* Cheng, Hao
 *Cc:* user@spark.apache.org
 *Subject:* Re: Registering custom UDAFs with HiveConetxt in SparkSQL,
 how?



 Thanks Hao,

 But my question concerns UDAF (user defined aggregation function ) not
 UDTF( user defined type function ).

 I appreciate if you could point me to some starting point on UDAF
 development in Spark.



 Thanks

 Shahab

 On Tuesday, March 10, 2015, Cheng, Hao hao.ch...@intel.com wrote:

  Currently, Spark SQL doesn’t provide interface for developing the
 custom UDTF, but it can work seamless with Hive UDTF.



 I am working on the UDTF refactoring for Spark SQL, hopefully will
 provide an Hive independent UDTF soon after that.



 *From:* shahab [mailto:shahab.mok...@gmail.com]
 *Sent:* Tuesday, March 10, 2015 5:44 PM
 *To:* user@spark.apache.org
 *Subject:* Registering custom UDAFs with HiveConetxt in SparkSQL, how?



 Hi,



 I need o develop couple of UDAFs and use them in the SparkSQL. While UDFs
 can be registered as a function in HiveContext, I could not find any
 documentation of how UDAFs can be registered in the HiveContext?? so far
 what I have found is to make a JAR file, out of developed UDAF class, and
 then deploy the JAR file to SparkSQL .



 But is there any way to avoid deploying the jar file and register it
 programmatically?





 best,

 /Shahab





-- 
---
Takeshi Yamamuro


Re: Quick GraphX gutcheck

2015-04-01 Thread Takeshi Yamamuro
hi,

Yes, you're right.
Original VertexIds are used to join them in VeretexRDD#xxxJoin.

On Thu, Apr 2, 2015 at 7:31 AM, hokiegeek2 soozandjohny...@gmail.com
wrote:

 Hi Everyone,

 Quick (hopefully) and silly (likely) question--the VertexId can be used to
 join the VertexRDD generated from Graph.vertices with a transformed RDD
 where the keys are vertexIds from the original graph, correct?

 --John




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Quick-GraphX-gutcheck-tp22345.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
---
Takeshi Yamamuro


Re: [Spark1.3] UDF registration issue

2015-04-13 Thread Takeshi Yamamuro
Hi,

It's a syntax error in Spark-1.3.
The next release of spark supports the kind of UDF calls in DataFrame.
See a link below.

https://issues.apache.org/jira/browse/SPARK-6379


On Sat, Apr 11, 2015 at 3:30 AM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 Hi, I'm running into some trouble trying to register a UDF:

 scala sqlContext.udf.register(strLen, (s: String) = s.length())
 res22: org.apache.spark.sql.UserDefinedFunction = 
 UserDefinedFunction(function1,IntegerType)

 scala cleanProcessDF.withColumn(dii,strLen(col(di)))
 console:33: error: not found: value strLen
   cleanProcessDF.withColumn(dii,strLen(col(di)))

 ​

 Where cleanProcessDF is a dataframe
 Is my syntax wrong? Or am I missing an import of some sort?




-- 
---
Takeshi Yamamuro


Re: Udf's in spark

2015-07-23 Thread Takeshi Yamamuro
Sure and sparksql supports Hive UDFs.
ISTM that the UDF 'DATE_FORMAT' is just not registered in your metastore?
Did you say 'CREATE FUNCTION' in advance?

Thanks,

On Tue, Jul 14, 2015 at 6:30 PM, Ravisankar Mani rrav...@gmail.com wrote:

 Hi Everyone,

 As mentioned in Spark sQL programming guide, Spark SQL support Hive UDFs.
 I have built the UDF's in hive meta store. It working perfectly in hive
 connection. But it is not working in spark (java.lang.RuntimeException:
 Couldn't find function DATE_FORMAT).

 Could you please help how to use this hive UDF' s in sprak?


 Regards,

 Ravisankar M R




-- 
---
Takeshi Yamamuro


Re: Switching broadcast mechanism from torrrent

2016-06-07 Thread Takeshi Yamamuro
Hi,

Since `HttpBroadcastFactory` has already been removed in master, so
you cannot use the broadcast mechanism in future releases.

Anyway, I couldn't find a root cause only from the stacktraces...

// maropu




On Mon, Jun 6, 2016 at 2:14 AM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I've set  spark.broadcast.factory to
> org.apache.spark.broadcast.HttpBroadcastFactory and it indeed resolve my
> issue.
>
> I'm creating a dataframe which creates a broadcast variable internally and
> then fails due to the torrent broadcast with the following stacktrace:
> Caused by: org.apache.spark.SparkException: Failed to get
> broadcast_3_piece0 of broadcast_3
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.broadcast.TorrentBroadcast.org
> $apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1220)
>
> I'm using spark 1.6.0 on CDH 5.7
>
> Thanks,
> Daniel
>
>
> On Wed, Jun 1, 2016 at 5:52 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> I found spark.broadcast.blockSize but no parameter to switch broadcast
>> method.
>>
>> Can you describe the issues with torrent broadcast in more detail ?
>>
>> Which version of Spark are you using ?
>>
>> Thanks
>>
>> On Wed, Jun 1, 2016 at 7:48 AM, Daniel Haviv <
>> daniel.ha...@veracity-group.com> wrote:
>>
>>> Hi,
>>> Our application is failing due to issues with the torrent broadcast, is
>>> there a way to switch to another broadcast method ?
>>>
>>> Thank you.
>>> Daniel
>>>
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Cleaning spark memory

2016-06-10 Thread Takeshi Yamamuro
Hi,

If you remove all cached data, please use `SQLContext#clearCache`.

// maropu

On Sat, Jun 11, 2016 at 3:18 AM, Cesar Flores <ces...@gmail.com> wrote:

>
> Hello:
>
> Sometimes I cache data frames to memory that I forget to unpersist, losing
> the variable reference in the process.
>
> Is there a way of: (i) is there a way of recovering references to data
> frames that are still persisted in memory OR (ii) a way of just unpersist
> all spark cached variables?
>
>
> Thanks
> --
> Cesar Flores
>



-- 
---
Takeshi Yamamuro


Re: Saving Parquet files to S3

2016-06-10 Thread Takeshi Yamamuro
Hi,

You'd better off `setting parquet.block.size`.

// maropu

On Thu, Jun 9, 2016 at 7:48 AM, Daniel Siegmann <daniel.siegm...@teamaol.com
> wrote:

> I don't believe there's anyway to output files of a specific size. What
> you can do is partition your data into a number of partitions such that the
> amount of data they each contain is around 1 GB.
>
> On Thu, Jun 9, 2016 at 7:51 AM, Ankur Jain <ankur.j...@yash.com> wrote:
>
>> Hello Team,
>>
>>
>>
>> I want to write parquet files to AWS S3, but I want to size each file
>> size to 1 GB.
>>
>> Can someone please guide me on how I can achieve the same?
>>
>>
>>
>> I am using AWS EMR with spark 1.6.1.
>>
>>
>>
>> Thanks,
>>
>> Ankur
>> Information transmitted by this e-mail is proprietary to YASH
>> Technologies and/ or its Customers and is intended for use only by the
>> individual or entity to which it is addressed, and may contain information
>> that is privileged, confidential or exempt from disclosure under applicable
>> law. If you are not the intended recipient or it appears that this mail has
>> been forwarded to you without proper authority, you are notified that any
>> use or dissemination of this information in any manner is strictly
>> prohibited. In such cases, please notify us immediately at i...@yash.com
>> and delete this mail from your records.
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Catalyst optimizer cpu/Io cost

2016-06-10 Thread Takeshi Yamamuro
Hi,

There no way to retrieve that information in spark.
In fact,  the current optimizer only consider the byte size of outputs in
LogicalPlan.
Related code can be found in
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L90

If you want to know more about catalyst, you can check the Yin Huai's slide
in spark summit 2016.
https://spark-summit.org/2016/speakers/yin-huai/
# Note: the slide is not available now, and it seems it will be in a few
weeks.

// maropu


On Fri, Jun 10, 2016 at 3:29 PM, Srinivasan Hariharan02 <
srinivasan_...@infosys.com> wrote:

> Hi,,
>
> How can I get spark sql query cpu and Io cost after optimizing for the
> best logical plan. Is there any api to retrieve this information?. If
> anyone point me to the code where actually cpu and Io cost computed in
> catalyst module.
>
> *Regards,*
> *Srinivasan Hariharan*
> *+91-9940395830 <%2B91-9940395830>*
>
>
>
>



-- 
---
Takeshi Yamamuro


Re: Custom positioning/partitioning Dataframes

2016-06-03 Thread Takeshi Yamamuro
Hi,

I'm afraid spark has no explicit api to set custom partitioners in df for
now.

// maropu

On Sat, Jun 4, 2016 at 1:09 AM, Nilesh Chakraborty <nil...@nileshc.com>
wrote:

> Hi,
>
> I have a domain-specific schema (RDF data with vertical partitioning, ie.
> one table per property) and I want to instruct SparkSQL to keep
> semantically
> closer property tables closer together, that is, group dataframes together
> into different nodes (or at least encourage it somehow) so that tables that
> are most frequently joined together are located locally together.
>
> Any thoughts on how I can do this with Spark? Any internal hack ideas are
> welcome too. :)
>
> Cheers,
> Nilesh
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Custom-positioning-partitioning-Dataframes-tp27084.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Strategies for propery load-balanced partitioning

2016-06-03 Thread Takeshi Yamamuro
Hi,

you can control this kinda issue in the comming v2.0.
See https://www.mail-archive.com/user@spark.apache.org/msg51603.html

// maropu


On Sat, Jun 4, 2016 at 10:23 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Hi Saif!
>
>
>
> When you say this happens with spark-csv, are the files gzipped by any
> chance? GZip is non-splittable so if you’re seeing skew simply from loading
> data it could be you have some extremely large gzip files. So for a single
> stage job you will have those tasks lagging compared to the smaller gzips.
> As you already said, the option there would be to repartition at the
> expense of shuffling. If you’re seeing this with parquet files, what do the
> individual part-* files look like (size, compression type, etc.)?
>
>
>
> Thanks,
>
> Silvio
>
>
>
> *From: *"saif.a.ell...@wellsfargo.com" <saif.a.ell...@wellsfargo.com>
> *Date: *Friday, June 3, 2016 at 8:31 AM
> *To: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *Strategies for propery load-balanced partitioning
>
>
>
> Hello everyone!
>
>
>
> I was noticing that, when reading parquet files or actually any kind of
> source data frame data (spark-csv, etc), default partinioning is not fair.
>
> Action tasks usually act very fast on some partitions and very slow on
> some others, and frequently, even fast on all but last partition (which
> looks like it reads +50% of the data input size).
>
>
>
> I notice that each task is loading some portion of the data, say 1024MB
> chunks, and some task loading 20+GB of data.
>
>
>
> Applying repartition strategies solve this issue properly and general
> performance is increased considerably, but for very large dataframes,
> repartitioning is a costly process.
>
>
>
> In short, what are the available strategies or configurations that help
> reading from disk or hdfs with proper executor-data-distribution??
>
>
>
> If this needs to be more specific, I am strictly focused on PARQUET files
> rom HDFS. I know there are some MIN
>
>
>
> Really appreciate,
>
> Saif
>
>
>



-- 
---
Takeshi Yamamuro


Re: How to generate seeded random numbers in GraphX Pregel API vertex procedure?

2016-06-03 Thread Takeshi Yamamuro
Hi,

yea, we have no simple way to do that in GraphX because the GraphX class
has both vertex and edge rdds and
we cannot simply implement mapPartitions there to keep vertex/edge
semantics inside.
Another idea is to generate edge files by using RDD#mapPartitions and write
them into HDFS, and then
you use GraphLoader#edgeListFile to load them.

// maropu

On Thu, Jun 2, 2016 at 11:20 PM, Roman Pastukhov <metaignat...@gmail.com>
wrote:

> As far as I understand, best way to generate seeded random numbers in
> Spark is to use mapPartititons with a seeded Random instance for each
> partition.
> But graph.pregel in GraphX does not have anything similar to mapPartitions.
>
> Can something like this be done in GraphX Pregel API?
>



-- 
---
Takeshi Yamamuro


Re: how to investigate skew and DataFrames and RangePartitioner

2016-06-14 Thread Takeshi Yamamuro
Hi,

I'm afraid there is currently no api to define RangeParititoner in df.

// maropu

On Tue, Jun 14, 2016 at 5:04 AM, Peter Halliday <pjh...@cornell.edu> wrote:

> I have two questions
>
> First,I have a failure when I write parquet from Spark 1.6.1 on Amazon EMR
> to S3.  This is full batch, which is over 200GB of source data.  The
> partitioning is based on a geographic identifier we use, and also a date we
> got the data.  However, because of geographical density we certainly could
> be hitting the fact we are getting tiles too dense.  I’m trying to figure
> out how to figure out the size of the file it’s trying to write out.
>
> Second, We use to use RDDs and RangePartitioner for task partitioning.
> However, I don’t see this available in DataFrames.  How does one achieve
> this now.
>
> Peter Halliday
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Is there a limit on the number of tasks in one job?

2016-06-13 Thread Takeshi Yamamuro
Hi,

You can control an initial num. of partitions (tasks) in v2.0.
https://www.mail-archive.com/user@spark.apache.org/msg51603.html

// maropu


On Tue, Jun 14, 2016 at 7:24 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Have you looked at spark GUI to see what it is waiting for. is that
> available memory. What is the resource manager you are using?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 13 June 2016 at 20:45, Khaled Hammouda <khaled.hammo...@kik.com> wrote:
>
>> Hi Michael,
>>
>> Thanks for the suggestion to use Spark 2.0 preview. I just downloaded the
>> preview and tried using it, but I’m running into the exact same issue.
>>
>> Khaled
>>
>> On Jun 13, 2016, at 2:58 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>> You might try with the Spark 2.0 preview.  We spent a bunch of time
>> improving the handling of many small files.
>>
>> On Mon, Jun 13, 2016 at 11:19 AM, khaled.hammouda <
>> khaled.hammo...@kik.com> wrote:
>>
>>> I'm trying to use Spark SQL to load json data that are split across
>>> about 70k
>>> files across 24 directories in hdfs, using
>>> sqlContext.read.json("hdfs:///user/hadoop/data/*/*").
>>>
>>> This doesn't seem to work for some reason, I get timeout errors like the
>>> following:
>>>
>>> ---
>>> 6/06/13 15:46:31 ERROR TransportChannelHandler: Connection to
>>> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 has been quiet for
>>> 12
>>> ms while there are outstanding requests. Assuming connection is dead;
>>> please
>>> adjust spark.network.timeout if this is wrong.
>>> 16/06/13 15:46:31 ERROR TransportResponseHandler: Still have 1 requests
>>> outstanding when connection from
>>> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 is closed
>>> ...
>>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
>>> seconds]. This timeout is controlled by spark.rpc.askTimeout
>>> ...
>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>>> [120 seconds]
>>> --
>>>
>>> I don't want to start tinkering with increasing timeouts yet. I tried to
>>> load just one sub-directory, which contains around 4k files, and this
>>> seems
>>> to work fine. So I thought of writing a loop where I load the json files
>>> from each sub-dir and then unionAll the current dataframe with the
>>> previous
>>> dataframe. However, this also fails because apparently the json files
>>> don't
>>> have the exact same schema, causing this error:
>>>
>>> ---
>>> Traceback (most recent call last):
>>>   File "/home/hadoop/load_json.py", line 65, in 
>>> df = df.unionAll(hrdf)
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
>>> line 998, in unionAll
>>>   File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
>>> line 813, in __call__
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
>>> 51, in deco
>>> pyspark.sql.utils.AnalysisException: u"unresolved operator 'Union;"
>>> ---
>>>
>>> I'd like to know what's preventing Spark from loading 70k files the same
>>> way
>>> it's loading 4k files?
>>>
>>> To give you some idea about my setup and data:
>>> - ~70k files across 24 directories in HDFS
>>> - Each directory contains 3k files on average
>>> - Cluster: 200 nodes EMR cluster, each node has 53 GB memory and 8 cores
>>> available to YARN
>>> - Spark 1.6.1
>>>
>>> Thanks.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-limit-on-the-number-of-tasks-in-one-job-tp27158.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>> <http://nabble.com>.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Dataset - reduceByKey

2016-06-07 Thread Takeshi Yamamuro
Seems you can see docs for 2.0 for now;
https://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/spark-2.0.0-SNAPSHOT-2016_06_07_07_01-1e2c931-docs/

// maropu

On Tue, Jun 7, 2016 at 11:40 AM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> It would also be nice if there was a better example of joining two
> Datasets. I am looking at the documentation here:
> http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems
> a little bit sparse - is there a better documentation source?
>
> Regards,
>
> Bryan Jeffrey
>
> On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>
>> Hello.
>>
>> I am looking at the option of moving RDD based operations to Dataset
>> based operations.  We are calling 'reduceByKey' on some pair RDDs we have.
>> What would the equivalent be in the Dataset interface - I do not see a
>> simple reduceByKey replacement.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: GraphX Java API

2016-05-29 Thread Takeshi Yamamuro
Hi,

Have you checked GraphFrame?
See the related discussion: See
https://issues.apache.org/jira/browse/SPARK-3665

// maropu

On Fri, May 27, 2016 at 8:22 PM, Santoshakhilesh <
santosh.akhil...@huawei.com> wrote:

> GraphX APis are available only in Scala. If you need to use GraphX you
> need to switch to Scala.
>
>
>
> *From:* Kumar, Abhishek (US - Bengaluru) [mailto:
> abhishekkuma...@deloitte.com]
> *Sent:* 27 May 2016 19:59
> *To:* user@spark.apache.org
> *Subject:* GraphX Java API
>
>
>
> Hi,
>
>
>
> We are trying to consume the Java API for GraphX, but there is no
> documentation available online on the usage or examples. It would be great
> if we could get some examples in Java.
>
>
>
> Thanks and regards,
>
>
>
> *Abhishek Kumar*
>
> Products & Services | iLab
>
> Deloitte Consulting LLP
>
> Block ‘C’, Divyasree Technopolis, Survey No.: 123 & 132/2, Yemlur Post,
> Yemlur, Bengaluru – 560037, Karnataka, India
>
> Mobile: +91 7736795770
>
> abhishekkuma...@deloitte.com | www.deloitte.com
>
>
>
> Please consider the environment before printing.
>
>
>
>
>
>
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
>
> v.E.1
>
>
>
>
>
>
>
>
>



-- 
---
Takeshi Yamamuro


Re: FAILED_TO_UNCOMPRESS Error - Spark 1.3.1

2016-05-30 Thread Takeshi Yamamuro
Hi,

This is a known issue.
You need to check a related JIRA ticket:
https://issues.apache.org/jira/browse/SPARK-4105

// maropu

On Mon, May 30, 2016 at 7:51 PM, Prashant Singh Thakur <
prashant.tha...@impetus.co.in> wrote:

> Hi,
>
>
>
> We are trying to use Spark Data Frames for our use case where we are
> getting this exception.
>
> The parameters used are listed below. Kindly suggest if we are missing
> something.
>
> Version used is Spark 1.3.1
>
> Jira is still showing this issue as Open
> https://issues.apache.org/jira/browse/SPARK-4105
>
> Kindly suggest if there is workaround .
>
>
>
> Exception :
>
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 88 in stage 40.0 failed 4 times, most recent failure: Lost
> task 88.3 in stage 40.0 : java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>
>   at
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
>
>   at org.xerial.snappy.SnappyNative.rawUncompress(Native
> Method)
>
>   at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:391)
>
>   at org.xerial.snappy.Snappy.uncompress(Snappy.java:427)
>
>   at
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
>
>   at
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
>
>   at
> org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)
>
>   at
> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$7.apply(TorrentBroadcast.scala:213)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$7.apply(TorrentBroadcast.scala:213)
>
>   at scala.Option.map(Option.scala:145)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:213)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
>
>   at
> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>
>   at
> org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>
>   at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
>
>   at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>   at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
>   at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>   at java.lang.Thread.run(Thread.java:745)
>
>
>
> Parameters Changed :
>
> spark.akka.frameSize=50
>
> spark.shuffle.memoryFraction=0.4
>
> spark.storage.memoryFraction=0.5
>
> spark.worker.timeout=12
>
> spark.storage.blockManagerSlaveTimeoutMs=12
>
> spark.akka.heartbeat.pauses=6000
>
> spark.akka.heartbeat.interval=1000
>
> spark.ui.port=21000
>
> spark.port.maxRetries=50
>
> spark.executor.memory=10G
>
> spark.executor.instances=100
>
> spark.driver.memory=8G
>
> spark.executor.cores=2
>
> spark.shuffle.compress=true
>
> spark.io.compression.codec=snappy
>
> spark.broadcast.compress=true
>
> spark.rdd.compress=true
>
> spark.worker.cleanup.enabled=true
>
> spark.worker.cleanup.interval=600
>
> spark.worker.cleanup.appDataTtl=600
>
> spark.shuffle.consolidateFiles=true
>
> spark.yarn.preserve.staging.files=false
>
> spark.yarn.driver.memoryOverhead=1024
>
> spark.yarn.executor.memoryOverhead=1024
>
>
>
> Best Regards,
>
> Prashant Singh Thakur
>
> Mobile: +91-9740266522
>
>
>
> --
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>



-- 
---
Takeshi Yamamuro


Re: equvalent beewn join sql and data frame

2016-05-30 Thread Takeshi Yamamuro
Hi,

The same they are.
If you check the equality, you can use DataFrame#explain.

// maropu


On Tue, May 31, 2016 at 12:26 PM, pseudo oduesp <pseudo20...@gmail.com>
wrote:

> hi guys ,
> it s similare  thing to do :
>
> sqlcontext.join("select * from t1 join t2 on condition) and
>
> df1.join(df2,condition,'inner")??
>
> ps:df1.registertable('t1')
> ps:df2.registertable('t2')
> thanks
>



-- 
---
Takeshi Yamamuro


Re: unsure how to create 2 outputs from spark-sql udf expression

2016-05-25 Thread Takeshi Yamamuro
Hi,

How about this?
--
val func = udf((i: Int) => Tuple2(i, i))
val df = Seq((1, 0), (2, 5)).toDF("a", "b")
df.select(func($"a").as("r")).select($"r._1", $"r._2")

// maropu


On Thu, May 26, 2016 at 5:11 AM, Koert Kuipers <ko...@tresata.com> wrote:

> hello all,
>
> i have a single udf that creates 2 outputs (so a tuple 2). i would like to
> add these 2 columns to my dataframe.
>
> my current solution is along these lines:
> df
>   .withColumn("_temp_", udf(inputColumns))
>   .withColumn("x", col("_temp_)("_1"))
>   .withColumn("y", col("_temp_")("_2"))
>   .drop("_temp_")
>
> this works, but its not pretty with the temporary field stuff.
>
> i also tried this:
> val tmp = udf(inputColumns)
> df
>   .withColumn("x", tmp("_1"))
>   .withColumn("y", tmp("_2"))
>
> this also works, but unfortunately the udf is evaluated twice
>
> is there a better way to do this?
>
> thanks! koert
>



-- 
---
Takeshi Yamamuro


Re: Spark input size when filtering on parquet files

2016-05-26 Thread Takeshi Yamamuro
Hi,

Spark just prints #bytes in the web UI that is accumulated from
InputSplit#getLength (it is just a length of files).
Therefore, I'm afraid this metric does not reflect actual read #bytes for
parquet.
If you get the metric, you need to use other tools such as iostat or
something.

// maropu


// maropu


On Fri, May 27, 2016 at 5:45 AM, Dennis Hunziker <dennis.hunzi...@gmail.com>
wrote:

> Hi all
>
>
>
> I was looking into Spark 1.6.1 (Parquet 1.7.0, Hive 1.2.1) in order to
> find out about the improvements made in filtering/scanning parquet files
> when querying for tables using SparkSQL and how these changes relate to the
> new filter API introduced in Parquet 1.7.0.
>
>
>
> After checking the usual sources, I still can’t make sense of some of the
> numbers shown on the Spark UI. As an example, I’m looking at the collect
> stage for a query that’s selecting a single row from a table containing 1
> million numbers using a simple where clause (i.e. col1 = 50) and this
> is what I see on the UI:
>
>
>
> 0 SUCCESS ... 2.4 MB (hadoop) / 0
>
> 1 SUCCESS ... 2.4 MB (hadoop) / 25
>
> 2 SUCCESS ... 2.4 MB (hadoop) / 0
>
> 3 SUCCESS ... 2.4 MB (hadoop) / 0
>
>
>
> Based on the min/max statistics of each of the parquet parts, it makes
> sense not to expect any records for 3 out of the 4, because the record I’m
> looking for can only be in a single file. But why is the input size above
> shown as 2.4 MB, totaling up to an overall input size of 9.7 MB for the
> whole stage? Isn't it just meant to read the metadata and ignore the
> content of the file?
>
>
>
> Regards,
>
> Dennis
>



-- 
---
Takeshi Yamamuro


Re: Spark Job Execution halts during shuffle...

2016-05-26 Thread Takeshi Yamamuro
Hi,

If you get stuck in job fails, one of best practices is to increase
#partitions.
Also, you'd better off using DataFrame instread of RDD in terms of join
optimization.

// maropu


On Thu, May 26, 2016 at 11:40 PM, Priya Ch <learnings.chitt...@gmail.com>
wrote:

> Hello Team,
>
>
>  I am trying to perform join 2 rdds where one is of size 800 MB and the
> other is 190 MB. During the join step, my job halts and I don't see
> progress in the execution.
>
> This is the message I see on console -
>
> INFO spark.MapOutputTrackerMasterEndPoint: Asked to send map output
> locations for shuffle 0 to :4
> INFO spark.MapOutputTrackerMasterEndPoint: Asked to send map output
> locations for shuffle 1 to :4
>
> After these messages, I dont see any progress. I am using Spark 1.6.0
> version and yarn scheduler (running in YARN client mode). My cluster
> configurations is - 3 node cluster (1 master and 2 slaves). Each slave has
> 1 TB hard disk space, 300GB memory and 32 cores.
>
> HDFS block size is 128 MB.
>
> Thanks,
> Padma Ch
>



-- 
---
Takeshi Yamamuro


Re: unsure how to create 2 outputs from spark-sql udf expression

2016-05-26 Thread Takeshi Yamamuro
Couldn't you include all the needed columns in your input dataframe?

// maropu

On Fri, May 27, 2016 at 1:46 AM, Koert Kuipers <ko...@tresata.com> wrote:

> that is nice and compact, but it does not add the columns to an existing
> dataframe
>
> On Wed, May 25, 2016 at 11:39 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> How about this?
>> --
>> val func = udf((i: Int) => Tuple2(i, i))
>> val df = Seq((1, 0), (2, 5)).toDF("a", "b")
>> df.select(func($"a").as("r")).select($"r._1", $"r._2")
>>
>> // maropu
>>
>>
>> On Thu, May 26, 2016 at 5:11 AM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> hello all,
>>>
>>> i have a single udf that creates 2 outputs (so a tuple 2). i would like
>>> to add these 2 columns to my dataframe.
>>>
>>> my current solution is along these lines:
>>> df
>>>   .withColumn("_temp_", udf(inputColumns))
>>>   .withColumn("x", col("_temp_)("_1"))
>>>   .withColumn("y", col("_temp_")("_2"))
>>>   .drop("_temp_")
>>>
>>> this works, but its not pretty with the temporary field stuff.
>>>
>>> i also tried this:
>>> val tmp = udf(inputColumns)
>>> df
>>>   .withColumn("x", tmp("_1"))
>>>   .withColumn("y", tmp("_2"))
>>>
>>> this also works, but unfortunately the udf is evaluated twice
>>>
>>> is there a better way to do this?
>>>
>>> thanks! koert
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Spark input size when filtering on parquet files

2016-06-01 Thread Takeshi Yamamuro
Technically, yes.
I'm not sure there is a parquet api for easily catching file statistics
(min, max, ...) though,
if it exists, it seems we could skip some file splits in
`ParquetFileFormat`.
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L273

// maropu

On Thu, Jun 2, 2016 at 5:51 AM, Dennis Hunziker <dennis.hunzi...@gmail.com>
wrote:

> Thanks, that makes sense. What I wonder though is that if we use parquet
> meta data caching, spark should be able to execute queries much faster when
> using a large amount of smaller .parquet files compared to a smaller amount
> of large ones. At least as long as the min/max indexing is efficient (i.e.
> the data is grouped/ordered). However, I'm not seeing this in my tests
> because of the overhead for creating many tasks for all these small files
> that mostly end up doing nothing at all. Is it possible to prevent that? I
> assume only if the driver was able to inspect the cached meta data and
> avoid creating tasks for files that aren't used in the first place.
>
>
> On 27 May 2016 at 04:25, Takeshi Yamamuro <linguin@gmail.com> wrote:
>
>> Hi,
>>
>> Spark just prints #bytes in the web UI that is accumulated from
>> InputSplit#getLength (it is just a length of files).
>> Therefore, I'm afraid this metric does not reflect actual read #bytes for
>> parquet.
>> If you get the metric, you need to use other tools such as iostat or
>> something.
>>
>> // maropu
>>
>>
>> // maropu
>>
>>
>> On Fri, May 27, 2016 at 5:45 AM, Dennis Hunziker <
>> dennis.hunzi...@gmail.com> wrote:
>>
>>> Hi all
>>>
>>>
>>>
>>> I was looking into Spark 1.6.1 (Parquet 1.7.0, Hive 1.2.1) in order to
>>> find out about the improvements made in filtering/scanning parquet files
>>> when querying for tables using SparkSQL and how these changes relate to the
>>> new filter API introduced in Parquet 1.7.0.
>>>
>>>
>>>
>>> After checking the usual sources, I still can’t make sense of some of
>>> the numbers shown on the Spark UI. As an example, I’m looking at the
>>> collect stage for a query that’s selecting a single row from a table
>>> containing 1 million numbers using a simple where clause (i.e. col1 =
>>> 50) and this is what I see on the UI:
>>>
>>>
>>>
>>> 0 SUCCESS ... 2.4 MB (hadoop) / 0
>>>
>>> 1 SUCCESS ... 2.4 MB (hadoop) / 25
>>>
>>> 2 SUCCESS ... 2.4 MB (hadoop) / 0
>>>
>>> 3 SUCCESS ... 2.4 MB (hadoop) / 0
>>>
>>>
>>>
>>> Based on the min/max statistics of each of the parquet parts, it makes
>>> sense not to expect any records for 3 out of the 4, because the record I’m
>>> looking for can only be in a single file. But why is the input size above
>>> shown as 2.4 MB, totaling up to an overall input size of 9.7 MB for the
>>> whole stage? Isn't it just meant to read the metadata and ignore the
>>> content of the file?
>>>
>>>
>>>
>>> Regards,
>>>
>>> Dennis
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Splitting RDD to exact number of partitions

2016-05-31 Thread Takeshi Yamamuro
If you don't hesitate the newest version, you try to use v2.0-preview.
http://spark.apache.org/news/spark-2.0.0-preview.html

There, you can control #partitions for input partitions without shuffles by
two parameters below;
spark.sql.files.maxPartitionBytes
spark.sql.files.openCostInBytes
( Not documented though, 

// maropu

On Tue, May 31, 2016 at 11:08 PM, Maciej Sokołowski <matemac...@gmail.com>
wrote:

> After setting shuffle to true I get expected 128 partitions, but I'm
> worried about performance of such solution - especially I see that some
> shuffling is done because size of partitions chages:
>
> scala> sc.textFile("hdfs:///proj/dFAB_test/testdata/perf_test1.csv",
> minPartitions=128).coalesce(128, true).mapPartitions{rows =>
> Iterator(rows.length)}.collect()
> res3: Array[Int] = Array(768, 768, 768, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 828, 896, 896, 896, 896, 896, 896,
> 896, 896, 896, 896, 896, 896, 850, 786, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768)
>
> I use spark 1.6.0
>
>
> On 31 May 2016 at 16:02, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Value for shuffle is false by default.
>>
>> Have you tried setting it to true ?
>>
>> Which Spark release are you using ?
>>
>> On Tue, May 31, 2016 at 6:13 AM, Maciej Sokołowski <matemac...@gmail.com>
>> wrote:
>>
>>> Hello Spark users and developers.
>>>
>>> I read file and want to ensure that it has exact number of partitions,
>>> for example 128.
>>>
>>> In documentation I found:
>>>
>>> def textFile(path: String, minPartitions: Int = defaultMinPartitions):
>>> RDD[String]
>>>
>>> But argument here is minimal number of partitions, so I use coalesce to
>>> ensure desired number of partitions:
>>>
>>> def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord:
>>> Ordering[T] = null): RDD[T]
>>> //Return a new RDD that is reduced into numPartitions partitions.
>>>
>>> So I combine them and get number of partitions lower than expected:
>>>
>>> scala> sc.textFile("perf_test1.csv",
>>> minPartitions=128).coalesce(128).getNumPartitions
>>> res14: Int = 126
>>>
>>> Is this expected behaviour? File contains 10 lines, size of
>>> partitions before and after coalesce:
>>>
>>> scala> sc.textFile("perf_test1.csv",
>>> minPartitions=128).mapPartitions{rows => Iterator(rows.length)}.collect()
>>> res16: Array[Int] = Array(782, 781, 782, 781, 781, 782, 781, 781, 781,
>>> 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781,
>>> 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782,
>>> 781, 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782,
>>> 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781,
>>> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781,
>>> 781, 782, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781,
>>> 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781,
>>> 781, 782, 781, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781)
>>>
>>> scala> sc.textFile("perf_test1.csv",
>>> minPartitions=128).coalesce(128).mapPartitions{rows =>
>>> Iterator(rows.length)}.collect()
>>> res15: Array[Int] = Array(1563, 781, 781, 781, 782, 781, 781, 781, 781,
>>> 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 781, 782, 781, 781, 781,
>>> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781,
>>> 781, 782, 781, 781, 781, 781, 1563, 782, 781, 781, 782, 781, 781, 781, 781,
>>> 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781,
>>> 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781,
>>> 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781,
>>> 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 782,
>>> 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782)
>>>
>>> So two partitions are double the size. Is this expected behaviour or is
>>> it some kind of bug?
>>>
>>> Thanks,
>>> Maciej Sokołowski
>>>
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: GraphX Java API

2016-05-29 Thread Takeshi Yamamuro
These package are used only for Scala.

On Mon, May 30, 2016 at 2:23 PM, Kumar, Abhishek (US - Bengaluru) <
abhishekkuma...@deloitte.com> wrote:

> Hey,
>
> ·   I see some graphx packages listed here:
>
> http://spark.apache.org/docs/latest/api/java/index.html
>
> ·   org.apache.spark.graphx
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/package-frame.html>
>
> ·   org.apache.spark.graphx.impl
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/impl/package-frame.html>
>
> ·   org.apache.spark.graphx.lib
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/lib/package-frame.html>
>
> ·   org.apache.spark.graphx.util
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/util/package-frame.html>
>
> Aren’t they meant to be used with JAVA?
>
> Thanks
>
>
>
> *From:* Santoshakhilesh [mailto:santosh.akhil...@huawei.com]
> *Sent:* Friday, May 27, 2016 4:52 PM
> *To:* Kumar, Abhishek (US - Bengaluru) <abhishekkuma...@deloitte.com>;
> user@spark.apache.org
> *Subject:* RE: GraphX Java API
>
>
>
> GraphX APis are available only in Scala. If you need to use GraphX you
> need to switch to Scala.
>
>
>
> *From:* Kumar, Abhishek (US - Bengaluru) [
> mailto:abhishekkuma...@deloitte.com <abhishekkuma...@deloitte.com>]
> *Sent:* 27 May 2016 19:59
> *To:* user@spark.apache.org
> *Subject:* GraphX Java API
>
>
>
> Hi,
>
>
>
> We are trying to consume the Java API for GraphX, but there is no
> documentation available online on the usage or examples. It would be great
> if we could get some examples in Java.
>
>
>
> Thanks and regards,
>
>
>
> *Abhishek Kumar*
>
>
>
>
>
>
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
>
> v.E.1
>
>
>
>
>
>
>
>
>



-- 
---
Takeshi Yamamuro


Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Takeshi Yamamuro
Hi,

Could you check the issue also occurs in v1.6.1 and v2.0?

// maropu

On Wed, Jun 22, 2016 at 2:42 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> I have an RDD[String, MyObj] which is a result of Join + Map operation. It
> has no partitioner info. I run reduceByKey without passing any Partitioner
> or partition counts.  I observed that output aggregation result for given
> key is incorrect sometime. like 1 out of 5 times. It looks like reduce
> operation is joining values from two different keys. There is no
> configuration change between multiple runs. I am scratching my head over
> this. I verified results by printing out RDD before and after reduce
> operation; collecting subset at driver.
>
> Besides shuffle and storage memory fraction I use following options:
>
> sparkConf.set("spark.driver.userClassPathFirst","true")
> sparkConf.set("spark.unsafe.offHeap","true")
> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>




-- 
---
Takeshi Yamamuro


Re: Confusing argument of sql.functions.count

2016-06-22 Thread Takeshi Yamamuro
Hi,

An argument for `functions.count` is needed for per-column counting;
df.groupBy($"a").agg(count($"b"))

// maropu

On Thu, Jun 23, 2016 at 1:27 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> See the first example in:
>
> http://www.w3schools.com/sql/sql_func_count.asp
>
> On Wed, Jun 22, 2016 at 9:21 AM, Jakub Dubovsky <
> spark.dubovsky.ja...@gmail.com> wrote:
>
>> Hey Ted,
>>
>> thanks for reacting.
>>
>> I am refering to both of them. They both take column as parameter
>> regardless of its type. Intuition here is that count should take no
>> parameter. Or am I missing something?
>>
>> Jakub
>>
>> On Wed, Jun 22, 2016 at 6:19 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Are you referring to the following method in
>>> sql/core/src/main/scala/org/apache/spark/sql/functions.scala :
>>>
>>>   def count(e: Column): Column = withAggregateFunction {
>>>
>>> Did you notice this method ?
>>>
>>>   def count(columnName: String): TypedColumn[Any, Long] =
>>>
>>> On Wed, Jun 22, 2016 at 9:06 AM, Jakub Dubovsky <
>>> spark.dubovsky.ja...@gmail.com> wrote:
>>>
>>>> Hey sparkers,
>>>>
>>>> an aggregate function *count* in *org.apache.spark.sql.functions*
>>>> package takes a *column* as an argument. Is this needed for something?
>>>> I find it confusing that I need to supply a column there. It feels like it
>>>> might be distinct count or something. This can be seen in latest
>>>> documentation
>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$>
>>>> .
>>>>
>>>> I am considering filling this in spark bug tracker. Any opinions on
>>>> this?
>>>>
>>>> Thanks
>>>>
>>>> Jakub
>>>>
>>>>
>>>
>>
>


-- 
---
Takeshi Yamamuro


Re: spark sql broadcast join ?

2016-06-17 Thread Takeshi Yamamuro
Hi,

Spark sends a smaller table into all the works as broadcast variables,
and it joins the table partition-by-partiiton.
By default, if table size is under 10MB, the broadcast join works.
See:
http://spark.apache.org/docs/1.6.1/sql-programming-guide.html#other-configuration-options

// maropu


On Fri, Jun 17, 2016 at 4:05 AM, kali.tumm...@gmail.com <
kali.tumm...@gmail.com> wrote:

> Hi All,
>
> I had used broadcast join in spark-scala applications I did used
> partitionby
> (Hash Partitioner) and then persit for wide dependencies, present project
> which I am working on pretty much Hive migration to spark-sql which is
> pretty much sql to be honest no scala or python apps.
>
> My question how to achieve broadcast join in plain spark-sql ? at the
> moment
> join between two talbes is taking ages.
>
> Thanks
> Sri
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-broadcast-join-tp27184.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: JDBC load into tempTable

2016-06-20 Thread Takeshi Yamamuro
Hi,

Currently, no.
spark cannot preserve the order of input data from jdbc.
If you want to have the ordered ids, you need to sort them explicitly.

// maropu

On Mon, Jun 20, 2016 at 7:41 PM, Ashok Kumar <ashok34...@yahoo.com.invalid>
wrote:

> Hi,
>
> I have a SQL server table with 500,000,000 rows with primary key (unique
> clustered index) on ID column
>
> If I load it through JDBC into a DataFrame and register it
> via registerTempTable will the data will be in the order of ID in tempTable?
>
> Thanks
>



-- 
---
Takeshi Yamamuro


Re: Switching broadcast mechanism from torrrent

2016-06-19 Thread Takeshi Yamamuro
How about using `transient` annotations?

// maropu

On Sun, Jun 19, 2016 at 10:51 PM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> Just updating on my findings for future reference.
> The problem was that after refactoring my code I ended up with a scala
> object which held SparkContext as a member, eg:
> object A  {
>  sc: SparkContext = new SparkContext
>  def mapFunction  {}
> }
>
> and when I called rdd.map(A.mapFunction) it failed as A.sc is not
> serializable.
>
> Thanks,
> Daniel
>
> On Tue, Jun 7, 2016 at 10:13 AM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> Since `HttpBroadcastFactory` has already been removed in master, so
>> you cannot use the broadcast mechanism in future releases.
>>
>> Anyway, I couldn't find a root cause only from the stacktraces...
>>
>> // maropu
>>
>>
>>
>>
>> On Mon, Jun 6, 2016 at 2:14 AM, Daniel Haviv <
>> daniel.ha...@veracity-group.com> wrote:
>>
>>> Hi,
>>> I've set  spark.broadcast.factory to
>>> org.apache.spark.broadcast.HttpBroadcastFactory and it indeed resolve my
>>> issue.
>>>
>>> I'm creating a dataframe which creates a broadcast variable internally
>>> and then fails due to the torrent broadcast with the following stacktrace:
>>> Caused by: org.apache.spark.SparkException: Failed to get
>>> broadcast_3_piece0 of broadcast_3
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at org.apache.spark.broadcast.TorrentBroadcast.org
>>> $apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
>>> at
>>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1220)
>>>
>>> I'm using spark 1.6.0 on CDH 5.7
>>>
>>> Thanks,
>>> Daniel
>>>
>>>
>>> On Wed, Jun 1, 2016 at 5:52 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> I found spark.broadcast.blockSize but no parameter to switch broadcast
>>>> method.
>>>>
>>>> Can you describe the issues with torrent broadcast in more detail ?
>>>>
>>>> Which version of Spark are you using ?
>>>>
>>>> Thanks
>>>>
>>>> On Wed, Jun 1, 2016 at 7:48 AM, Daniel Haviv <
>>>> daniel.ha...@veracity-group.com> wrote:
>>>>
>>>>> Hi,
>>>>> Our application is failing due to issues with the torrent broadcast,
>>>>> is there a way to switch to another broadcast method ?
>>>>>
>>>>> Thank you.
>>>>> Daniel
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Dataset Select Function after Aggregate Error

2016-06-17 Thread Takeshi Yamamuro
Hi,

In 2.0, you can say;
val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
ds.groupBy($"_1").count.select($"_1", $"count").show


// maropu


On Sat, Jun 18, 2016 at 7:53 AM, Xinh Huynh <xinh.hu...@gmail.com> wrote:

> Hi Pedro,
>
> In 1.6.1, you can do:
> >> ds.groupBy(_.uid).count().map(_._1)
> or
> >> ds.groupBy(_.uid).count().select($"value".as[String])
>
> It doesn't have the exact same syntax as for DataFrame.
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
>
> It might be different in 2.0.
>
> Xinh
>
> On Fri, Jun 17, 2016 at 3:33 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I am working on using Datasets in 1.6.1 and eventually 2.0 when its
>> released.
>>
>> I am running the aggregate code below where I have a dataset where the
>> row has a field uid:
>>
>> ds.groupBy(_.uid).count()
>> // res0: org.apache.spark.sql.Dataset[(String, Long)] = [_1: string, _2:
>> bigint]
>>
>> This works as expected, however, attempts to run select statements after
>> fails:
>> ds.groupBy(_.uid).count().select(_._1)
>> // error: missing parameter type for expanded function ((x$2) => x$2._1)
>> ds.groupBy(_.uid).count().select(_._1)
>>
>> I have tried several variants, but nothing seems to work. Below is the
>> equivalent Dataframe code which works as expected:
>> df.groupBy("uid").count().select("uid")
>>
>> Thanks!
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Running Spark in local mode

2016-06-19 Thread Takeshi Yamamuro
Hi,

In a local mode, spark runs in a single JVM that has a master and one
executor with `k` threads.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala#L94

// maropu


On Sun, Jun 19, 2016 at 5:39 PM, Ashok Kumar <ashok34...@yahoo.com.invalid>
wrote:

> Hi,
>
> I have been told Spark in Local mode is simplest for testing. Spark
> document covers little on local mode except the cores used in --master
> local[k].
>
> Where are the the driver program, executor and resources. Do I need to
> start worker threads and how many app I can use safely without exceeding
> memory allocated etc?
>
> Thanking you
>
>
>


-- 
---
Takeshi Yamamuro


Re: Dataset Select Function after Aggregate Error

2016-06-18 Thread Takeshi Yamamuro
'$' is just replaced with 'Column' inside.

// maropu

On Sat, Jun 18, 2016 at 12:59 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
wrote:

> Thanks Xinh and Takeshi,
>
> I am trying to avoid map since my impression is that this uses a Scala
> closure so is not optimized as well as doing column-wise operations is.
>
> Looks like the $ notation is the way to go, thanks for the help. Is there
> an explanation of how this works? I imagine it is a method/function with
> its name defined as $ in Scala?
>
> Lastly, are there prelim Spark 2.0 docs? If there isn't a good
> description/guide of using this syntax I would be willing to contribute
> some documentation.
>
> Pedro
>
> On Fri, Jun 17, 2016 at 8:53 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> In 2.0, you can say;
>> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>> ds.groupBy($"_1").count.select($"_1", $"count").show
>>
>>
>> // maropu
>>
>>
>> On Sat, Jun 18, 2016 at 7:53 AM, Xinh Huynh <xinh.hu...@gmail.com> wrote:
>>
>>> Hi Pedro,
>>>
>>> In 1.6.1, you can do:
>>> >> ds.groupBy(_.uid).count().map(_._1)
>>> or
>>> >> ds.groupBy(_.uid).count().select($"value".as[String])
>>>
>>> It doesn't have the exact same syntax as for DataFrame.
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
>>>
>>> It might be different in 2.0.
>>>
>>> Xinh
>>>
>>> On Fri, Jun 17, 2016 at 3:33 PM, Pedro Rodriguez <
>>> ski.rodrig...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am working on using Datasets in 1.6.1 and eventually 2.0 when its
>>>> released.
>>>>
>>>> I am running the aggregate code below where I have a dataset where the
>>>> row has a field uid:
>>>>
>>>> ds.groupBy(_.uid).count()
>>>> // res0: org.apache.spark.sql.Dataset[(String, Long)] = [_1: string,
>>>> _2: bigint]
>>>>
>>>> This works as expected, however, attempts to run select statements
>>>> after fails:
>>>> ds.groupBy(_.uid).count().select(_._1)
>>>> // error: missing parameter type for expanded function ((x$2) =>
>>>> x$2._1)
>>>> ds.groupBy(_.uid).count().select(_._1)
>>>>
>>>> I have tried several variants, but nothing seems to work. Below is the
>>>> equivalent Dataframe code which works as expected:
>>>> df.groupBy("uid").count().select("uid")
>>>>
>>>> Thanks!
>>>> --
>>>> Pedro Rodriguez
>>>> PhD Student in Distributed Machine Learning | CU Boulder
>>>> UC Berkeley AMPLab Alumni
>>>>
>>>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>>>> Github: github.com/EntilZha | LinkedIn:
>>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>>
>>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


-- 
---
Takeshi Yamamuro


Re: Dataset Select Function after Aggregate Error

2016-06-18 Thread Takeshi Yamamuro
which version you use?
I passed in 2.0-preview as follows;
---

Spark context available as 'sc' (master = local[*], app id =
local-1466234043659).

Spark session available as 'spark'.

Welcome to

    __

 / __/__  ___ _/ /__

_\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0-preview

  /_/



Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_31)

Type in expressions to have them evaluated.

Type :help for more information.


scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS

hive.metastore.schema.verification is not enabled so recording the schema
version 1.2.0

ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int]

scala> ds.groupBy($"_1").count.select($"_1", $"count").show

+---+-+

| _1|count|

+---+-+

|  1|1|

|  2|1|

+---+-+



On Sat, Jun 18, 2016 at 3:09 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
wrote:

> I went ahead and downloaded/compiled Spark 2.0 to try your code snippet
> Takeshi. It unfortunately doesn't compile.
>
> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int]
>
> scala> ds.groupBy($"_1").count.select($"_1", $"count").show
> :28: error: type mismatch;
>  found   : org.apache.spark.sql.ColumnName
>  required: org.apache.spark.sql.TypedColumn[(org.apache.spark.sql.Row,
> Long),?]
>   ds.groupBy($"_1").count.select($"_1", $"count").show
>  ^
>
> I also gave a try to Xinh's suggestion using the code snippet below
> (partially from spark docs)
> scala> val ds = Seq(Person("Andy", 32), Person("Andy", 2), Person("Pedro",
> 24), Person("Bob", 42)).toDS()
> scala> ds.groupBy(_.name).count.select($"name".as[String]).show
> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given input
> columns: [];
> scala> ds.groupBy(_.name).count.select($"_1".as[String]).show
> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given input
> columns: [];
> scala> ds.groupBy($"name").count.select($"_1".as[String]).show
> org.apache.spark.sql.AnalysisException: cannot resolve '_1' given input
> columns: [];
>
> Looks like there are empty columns for some reason, the code below works
> fine for the simple aggregate
> scala> ds.groupBy(_.name).count.show
>
> Would be great to see an idiomatic example of using aggregates like these
> mixed with spark.sql.functions.
>
> Pedro
>
> On Fri, Jun 17, 2016 at 9:59 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> Thanks Xinh and Takeshi,
>>
>> I am trying to avoid map since my impression is that this uses a Scala
>> closure so is not optimized as well as doing column-wise operations is.
>>
>> Looks like the $ notation is the way to go, thanks for the help. Is there
>> an explanation of how this works? I imagine it is a method/function with
>> its name defined as $ in Scala?
>>
>> Lastly, are there prelim Spark 2.0 docs? If there isn't a good
>> description/guide of using this syntax I would be willing to contribute
>> some documentation.
>>
>> Pedro
>>
>> On Fri, Jun 17, 2016 at 8:53 PM, Takeshi Yamamuro <linguin@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> In 2.0, you can say;
>>> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>>> ds.groupBy($"_1").count.select($"_1", $"count").show
>>>
>>>
>>> // maropu
>>>
>>>
>>> On Sat, Jun 18, 2016 at 7:53 AM, Xinh Huynh <xinh.hu...@gmail.com>
>>> wrote:
>>>
>>>> Hi Pedro,
>>>>
>>>> In 1.6.1, you can do:
>>>> >> ds.groupBy(_.uid).count().map(_._1)
>>>> or
>>>> >> ds.groupBy(_.uid).count().select($"value".as[String])
>>>>
>>>> It doesn't have the exact same syntax as for DataFrame.
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
>>>>
>>>> It might be different in 2.0.
>>>>
>>>> Xinh
>>>>
>>>> On Fri, Jun 17, 2016 at 3:33 PM, Pedro Rodriguez <
>>>> ski.rodrig...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am working on using Datasets in 1.6.1 and eventually 2.0 when its
>>>>> released.
>>>&

Re: Running Spark in local mode

2016-06-19 Thread Takeshi Yamamuro
There are many technical differences inside though, how to use is the
almost same with each other.
yea, in a standalone mode, spark runs in a cluster way: see
http://spark.apache.org/docs/1.6.1/cluster-overview.html

// maropu

On Sun, Jun 19, 2016 at 6:14 PM, Ashok Kumar <ashok34...@yahoo.com> wrote:

> thank you
>
> What are the main differences between a local mode and standalone mode. I
> understand local mode does not support cluster. Is that the only difference?
>
>
>
> On Sunday, 19 June 2016, 9:52, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>
> Hi,
>
> In a local mode, spark runs in a single JVM that has a master and one
> executor with `k` threads.
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala#L94
>
> // maropu
>
>
> On Sun, Jun 19, 2016 at 5:39 PM, Ashok Kumar <ashok34...@yahoo.com.invalid
> > wrote:
>
> Hi,
>
> I have been told Spark in Local mode is simplest for testing. Spark
> document covers little on local mode except the cores used in --master
> local[k].
>
> Where are the the driver program, executor and resources. Do I need to
> start worker threads and how many app I can use safely without exceeding
> memory allocated etc?
>
> Thanking you
>
>
>
>
>
> --
> ---
> Takeshi Yamamuro
>
>
>


-- 
---
Takeshi Yamamuro


Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread Takeshi Yamamuro
Hi,

How about caching the result of `select * from a where a.c2 < 1000`, then
joining them?
You probably need to tune `spark.sql.autoBroadcastJoinThreshold` to enable
broadcast joins for the result table.

// maropu


On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247 <zhen...@dtdream.com> wrote:

> Hi everyone,
>
> I ran a SQL join statement on Spark 1.6.1 like this:
> select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;
> and it took quite a long time because It is a SortMergeJoin and the two
> tables are big.
>
>
> In fact,  the size of filter result(select * from a where a.c2 < 1000) is
> very small, and I think a better solution is to use a BroadcastJoin with
> the filter result, but  I know  the physical plan is static and it won't be
> changed.
>
> So, can we make the physical plan more adaptive? (In this example, I mean
> using a  BroadcastHashJoin instead of SortMergeJoin automatically. )
>
>
>
>
>
>


-- 
---
Takeshi Yamamuro


Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread Takeshi Yamamuro
Seems it is hard to predict the output size of filters because the current
spark has limited statistics of input data. A few hours ago, Reynold
created a ticket for cost-based optimizer framework in
https://issues.apache.org/jira/browse/SPARK-16026.
If you have ideas, questions, and suggestions, feel free to join the
discussion.

// maropu


On Mon, Jun 20, 2016 at 8:21 PM, 梅西0247 <zhen...@dtdream.com> wrote:

>
>
> Thanks for your reply, In fact, that is what i just did
>
> But my question is:
> Can we change the spark join behavior more clever, to turn a sortmergejoin
> into broadcasthashjoin automatically when if "found" that a output RDD is
> small enough?
>
>
> ------
> 发件人:Takeshi Yamamuro <linguin@gmail.com>
> 发送时间:2016年6月20日(星期一) 19:16
> 收件人:梅西0247 <zhen...@dtdream.com>
> 抄 送:user <user@spark.apache.org>
> 主 题:Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?
>
> Hi,
>
> How about caching the result of `select * from a where a.c2 < 1000`, then
> joining them?
> You probably need to tune `spark.sql.autoBroadcastJoinThreshold` to
> enable broadcast joins for the result table.
>
> // maropu
>
>
> On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247 <zhen...@dtdream.com> wrote:
> Hi everyone,
>
> I ran a SQL join statement on Spark 1.6.1 like this:
> select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;
> and it took quite a long time because It is a SortMergeJoin and the two
> tables are big.
>
>
> In fact,  the size of filter result(select * from a where a.c2 < 1000) is
> very small, and I think a better solution is to use a BroadcastJoin with
> the filter result, but  I know  the physical plan is static and it won't be
> changed.
>
> So, can we make the physical plan more adaptive? (In this example, I mean
> using a  BroadcastHashJoin instead of SortMergeJoin automatically. )
>
>
>
>
>
>
>
>
> --
> ---
> Takeshi Yamamuro
>
>
>


-- 
---
Takeshi Yamamuro


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-26 Thread Takeshi Yamamuro
Hi,

This behaviour seems to be expected because you must ensure `b + zero() = b`
The your case `b + null = null` breaks this rule.
This is the same with v1.6.1.
See:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57

// maropu


On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela <amitsel...@gmail.com> wrote:

> Sometimes, the BUF for the aggregator may depend on the actual input.. and
> while this passes the responsibility to handle null in merge/reduce to the
> developer, it sounds fine to me if he is the one who put null in zero()
> anyway.
> Now, it seems that the aggregation is skipped entirely when zero() =
> null. Not sure if that was the behaviour in 1.6
>
> Is this behaviour wanted ?
>
> Thanks,
> Amit
>
> Aggregator example:
>
> public static class Agg extends Aggregator<Tuple2<String, Integer>, Integer, 
> Integer> {
>
>   @Override
>   public Integer zero() {
> return null;
>   }
>
>   @Override
>   public Integer reduce(Integer b, Tuple2<String, Integer> a) {
> if (b == null) {
>   b = 0;
> }
> return b + a._2();
>   }
>
>   @Override
>   public Integer merge(Integer b1, Integer b2) {
> if (b1 == null) {
>   return b2;
>     } else if (b2 == null) {
>   return b1;
> } else {
>   return b1 + b2;
> }
>   }
>
>


-- 
---
Takeshi Yamamuro


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-26 Thread Takeshi Yamamuro
No, TypedAggregateExpression that uses Aggregator#zero is different between
v2.0 and v1.6.
v2.0:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91
v1.6:
https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115

// maropu


On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela <amitsel...@gmail.com> wrote:

> This "if (value == null)" condition you point to exists in 1.6 branch as
> well, so that's probably not the reason.
>
> On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Whatever it is, this is expected; if an initial value is null, spark
>> codegen removes all the aggregates.
>> See:
>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199
>>
>> // maropu
>>
>> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela <amitsel...@gmail.com> wrote:
>>
>>> Not sure about what's the rule in case of `b + null = null` but the same
>>> code works perfectly in 1.6.1, just tried it..
>>>
>>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro <linguin@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> This behaviour seems to be expected because you must ensure `b + zero()
>>>> = b`
>>>> The your case `b + null = null` breaks this rule.
>>>> This is the same with v1.6.1.
>>>> See:
>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>>>>
>>>> // maropu
>>>>
>>>>
>>>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela <amitsel...@gmail.com>
>>>> wrote:
>>>>
>>>>> Sometimes, the BUF for the aggregator may depend on the actual input..
>>>>> and while this passes the responsibility to handle null in merge/reduce to
>>>>> the developer, it sounds fine to me if he is the one who put null in 
>>>>> zero()
>>>>> anyway.
>>>>> Now, it seems that the aggregation is skipped entirely when zero() =
>>>>> null. Not sure if that was the behaviour in 1.6
>>>>>
>>>>> Is this behaviour wanted ?
>>>>>
>>>>> Thanks,
>>>>> Amit
>>>>>
>>>>> Aggregator example:
>>>>>
>>>>> public static class Agg extends Aggregator<Tuple2<String, Integer>, 
>>>>> Integer, Integer> {
>>>>>
>>>>>   @Override
>>>>>   public Integer zero() {
>>>>> return null;
>>>>>   }
>>>>>
>>>>>   @Override
>>>>>   public Integer reduce(Integer b, Tuple2<String, Integer> a) {
>>>>> if (b == null) {
>>>>>   b = 0;
>>>>> }
>>>>> return b + a._2();
>>>>>   }
>>>>>
>>>>>   @Override
>>>>>   public Integer merge(Integer b1, Integer b2) {
>>>>> if (b1 == null) {
>>>>>   return b2;
>>>>> } else if (b2 == null) {
>>>>>   return b1;
>>>>> } else {
>>>>>   return b1 + b2;
>>>>> }
>>>>>   }
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


-- 
---
Takeshi Yamamuro


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-26 Thread Takeshi Yamamuro
Whatever it is, this is expected; if an initial value is null, spark
codegen removes all the aggregates.
See:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199

// maropu

On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela <amitsel...@gmail.com> wrote:

> Not sure about what's the rule in case of `b + null = null` but the same
> code works perfectly in 1.6.1, just tried it..
>
> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> This behaviour seems to be expected because you must ensure `b + zero() =
>> b`
>> The your case `b + null = null` breaks this rule.
>> This is the same with v1.6.1.
>> See:
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>>
>> // maropu
>>
>>
>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela <amitsel...@gmail.com> wrote:
>>
>>> Sometimes, the BUF for the aggregator may depend on the actual input..
>>> and while this passes the responsibility to handle null in merge/reduce to
>>> the developer, it sounds fine to me if he is the one who put null in zero()
>>> anyway.
>>> Now, it seems that the aggregation is skipped entirely when zero() =
>>> null. Not sure if that was the behaviour in 1.6
>>>
>>> Is this behaviour wanted ?
>>>
>>> Thanks,
>>> Amit
>>>
>>> Aggregator example:
>>>
>>> public static class Agg extends Aggregator<Tuple2<String, Integer>, 
>>> Integer, Integer> {
>>>
>>>   @Override
>>>   public Integer zero() {
>>> return null;
>>>   }
>>>
>>>   @Override
>>>   public Integer reduce(Integer b, Tuple2<String, Integer> a) {
>>>     if (b == null) {
>>>   b = 0;
>>> }
>>> return b + a._2();
>>>   }
>>>
>>>   @Override
>>>   public Integer merge(Integer b1, Integer b2) {
>>> if (b1 == null) {
>>>   return b2;
>>> } else if (b2 == null) {
>>>   return b1;
>>> } else {
>>>   return b1 + b2;
>>> }
>>>   }
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


-- 
---
Takeshi Yamamuro


Re: Spark Memory Error - Not enough space to cache broadcast

2016-06-16 Thread Takeshi Yamamuro
Hi,

Have you checked the statistics of storage memory, or something?

// maropu

On Thu, Jun 16, 2016 at 1:37 PM, Cassa L <lcas...@gmail.com> wrote:

> Hi,
>  I did set  --driver-memory 4G. I still run into this issue after 1 hour
> of data load.
>
> I also tried version 1.6 in test environment. I hit this issue much faster
> than in 1.5.1 setup.
> LCassa
>
> On Tue, Jun 14, 2016 at 3:57 PM, Gaurav Bhatnagar <gaura...@gmail.com>
> wrote:
>
>> try setting the option --driver-memory 4G
>>
>> On Tue, Jun 14, 2016 at 3:52 PM, Ben Slater <ben.sla...@instaclustr.com>
>> wrote:
>>
>>> A high level shot in the dark but in our testing we found Spark 1.6 a
>>> lot more reliable in low memory situations (presumably due to
>>> https://issues.apache.org/jira/browse/SPARK-1). If it’s an option,
>>> probably worth a try.
>>>
>>> Cheers
>>> Ben
>>>
>>> On Wed, 15 Jun 2016 at 08:48 Cassa L <lcas...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> I would appreciate any clue on this. It has become a bottleneck for our
>>>> spark job.
>>>>
>>>> On Mon, Jun 13, 2016 at 2:56 PM, Cassa L <lcas...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm using spark 1.5.1 version. I am reading data from Kafka into Spark 
>>>>> and writing it into Cassandra after processing it. Spark job starts fine 
>>>>> and runs all good for some time until I start getting below errors. Once 
>>>>> these errors come, job start to lag behind and I see that job has 
>>>>> scheduling and processing delays in streaming  UI.
>>>>>
>>>>> Worker memory is 6GB, executor-memory is 5GB, I also tried to tweak 
>>>>> memoryFraction parameters. Nothing works.
>>>>>
>>>>>
>>>>> 16/06/13 21:26:02 INFO MemoryStore: ensureFreeSpace(4044) called with 
>>>>> curMem=565394, maxMem=2778495713
>>>>> 16/06/13 21:26:02 INFO MemoryStore: Block broadcast_69652_piece0 stored 
>>>>> as bytes in memory (estimated size 3.9 KB, free 2.6 GB)
>>>>> 16/06/13 21:26:02 INFO TorrentBroadcast: Reading broadcast variable 69652 
>>>>> took 2 ms
>>>>> 16/06/13 21:26:02 WARN MemoryStore: Failed to reserve initial memory 
>>>>> threshold of 1024.0 KB for computing block broadcast_69652 in memory.
>>>>> 16/06/13 21:26:02 WARN MemoryStore: Not enough space to cache 
>>>>> broadcast_69652 in memory! (computed 496.0 B so far)
>>>>> 16/06/13 21:26:02 INFO MemoryStore: Memory use = 556.1 KB (blocks) + 2.6 
>>>>> GB (scratch space shared across 0 tasks(s)) = 2.6 GB. Storage limit = 2.6 
>>>>> GB.
>>>>> 16/06/13 21:26:02 WARN MemoryStore: Persisting block broadcast_69652 to 
>>>>> disk instead.
>>>>> 16/06/13 21:26:02 INFO BlockManager: Found block rdd_100761_1 locally
>>>>> 16/06/13 21:26:02 INFO Executor: Finished task 0.0 in stage 71577.0 (TID 
>>>>> 452316). 2043 bytes result sent to driver
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> L
>>>>>
>>>>>
>>>> --
>>> 
>>> Ben Slater
>>> Chief Product Officer
>>> Instaclustr: Cassandra + Spark - Managed | Consulting | Support
>>> +61 437 929 798
>>>
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Optimal way to re-partition from a single partition

2016-02-08 Thread Takeshi Yamamuro
Hi,

Plz use DataFrame#repartition.

On Tue, Feb 9, 2016 at 7:30 AM, Cesar Flores <ces...@gmail.com> wrote:

>
> I have a data frame which I sort using orderBy function. This operation
> causes my data frame to go to a single partition. After using those
> results, I would like to re-partition to a larger number of partitions.
> Currently I am just doing:
>
> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
> partition and around 14 million records
> val newDF =  hc.createDataFrame(rdd, df.schema)
>
> This process is really slow. Is there any other way of achieving this
> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>
>
> Thanks a lot
> --
> Cesar Flores
>



-- 
---
Takeshi Yamamuro


Re: Optimal way to re-partition from a single partition

2016-02-09 Thread Takeshi Yamamuro
Hi,

DataFrame#sort() uses `RangePartitioning` in `Exchange` instead of
`HashPartitioning`.
`RangePartitioning` roughly samples input data and internally computes
partition bounds
to split given rows into `spark.sql.shuffle.partitions` partitions.
Therefore, when sort keys are highly skewed, I think some partitions could
end up being empty
(that is, # of result partitions is lower than `spark.sql.shuffle.partitions`
.


On Tue, Feb 9, 2016 at 9:35 PM, Hemant Bhanawat <hemant9...@gmail.com>
wrote:

> For sql shuffle operations like groupby, the number of output partitions
> is controlled by spark.sql.shuffle.partitions. But, it seems orderBy does
> not honour this.
>
> In my small test, I could see that the number of partitions  in DF
> returned by orderBy was equal to the total number of distinct keys. Are you
> observing the same, I mean do you have a single value for all rows in the
> column on which you are running orderBy? If yes, you are better off not
> running the orderBy clause.
>
> May be someone from spark sql team could answer that how should the
> partitioning of the output DF be handled when doing an orderBy?
>
> Hemant
> www.snappydata.io
> https://github.com/SnappyDataInc/snappydata
>
>
>
>
> On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores <ces...@gmail.com> wrote:
>
>>
>> I have a data frame which I sort using orderBy function. This operation
>> causes my data frame to go to a single partition. After using those
>> results, I would like to re-partition to a larger number of partitions.
>> Currently I am just doing:
>>
>> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
>> partition and around 14 million records
>> val newDF =  hc.createDataFrame(rdd, df.schema)
>>
>> This process is really slow. Is there any other way of achieving this
>> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>>
>>
>> Thanks a lot
>> --
>> Cesar Flores
>>
>
>


-- 
---
Takeshi Yamamuro


Re: spark metrics question

2016-02-05 Thread Takeshi Yamamuro
How about using `spark.jars` to send jars into a cluster?

On Sat, Feb 6, 2016 at 12:00 AM, Matt K <matvey1...@gmail.com> wrote:

> Yes. And what I'm trying to figure out if there's a way to package the jar
> in such a way that I don't have to install it on every Executor node.
>
>
> On Wed, Feb 3, 2016 at 7:46 PM, Yiannis Gkoufas <johngou...@gmail.com>
> wrote:
>
>> Hi Matt,
>>
>> does the custom class you want to package reports metrics of each
>> Executor?
>>
>> Thanks
>>
>> On 3 February 2016 at 15:56, Matt K <matvey1...@gmail.com> wrote:
>>
>>> Thanks for sharing Yiannis, looks very promising!
>>>
>>> Do you know if I can package a custom class with my application, or does
>>> it have to be pre-deployed on all Executor nodes?
>>>
>>> On Wed, Feb 3, 2016 at 10:36 AM, Yiannis Gkoufas <johngou...@gmail.com>
>>> wrote:
>>>
>>>> Hi Matt,
>>>>
>>>> there is some related work I recently did in IBM Research for
>>>> visualizing the metrics produced.
>>>> You can read about it here
>>>> http://www.spark.tc/sparkoscope-enabling-spark-optimization-through-cross-stack-monitoring-and-visualization-2/
>>>> We recently opensourced it if you are interested to have a deeper look
>>>> to it: https://github.com/ibm-research-ireland/sparkoscope
>>>>
>>>> Thanks,
>>>> Yiannis
>>>>
>>>> On 3 February 2016 at 13:32, Matt K <matvey1...@gmail.com> wrote:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> I'm looking to create a custom sync based on Spark's Metrics System:
>>>>>
>>>>> https://github.com/apache/spark/blob/9f603fce78fcc997926e9a72dec44d48cbc396fc/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
>>>>>
>>>>> If I want to collect metrics from the Driver, Master, and Executor
>>>>> nodes, should the jar with the custom class be installed on Driver, 
>>>>> Master,
>>>>> and Executor nodes?
>>>>>
>>>>> Also, on Executor nodes, does the MetricsSystem run inside the
>>>>> Executor's JVM?
>>>>>
>>>>> Thanks,
>>>>> -Matt
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> www.calcmachine.com - easy online calculator.
>>>
>>
>>
>
>
> --
> www.calcmachine.com - easy online calculator.
>



-- 
---
Takeshi Yamamuro


Re: What is the best way to JOIN two 10TB csv files and three 100kb files on Spark?

2016-02-05 Thread Takeshi Yamamuro
Hi,

How about using broadcast joins?
largeDf.join(broadcast(smallDf), "joinKey")

On Sat, Feb 6, 2016 at 2:25 AM, Rex X <dnsr...@gmail.com> wrote:

> Dear all,
>
> The new DataFrame of spark is extremely fast. But out cluster have limited
> RAM (~500GB).
>
> What is the best way to do such a big table Join?
>
> Any sample code is greatly welcome!
>
>
> Best,
> Rex
>
>


-- 
---
Takeshi Yamamuro


Re: rdd cache priority

2016-02-04 Thread Takeshi Yamamuro
Hi,

u're right; rdd3 is not totally cached and it is re-computed every time.
If MEMORY_AND_DISK, rdd3 is written to disk.

Also, the current Spark does not automatically unpersist rdds depends
on frequency of use.

On Fri, Feb 5, 2016 at 12:15 PM, charles li <charles.up...@gmail.com> wrote:

> say I have 2 RDDs, RDD1 and RDD2.
>
> both are 20g in memory.
>
> and I cache both of them in memory using RDD1.cache() and RDD2.cache()
>
>
> the in the further steps on my app, I never use RDD1 but use RDD2 for lots
> of time.
>
>
> then here is my question:
>
> if there is only 40G memory in my cluster, and here I have another RDD,
> RDD3 for 20g, what happened if I cache RDD3 using RDD3.cache()?
>
>
> as the document says, cache using the default cache level : MEMORY_ONLY .
> it means that it will not definitely cache RDD3 but re-compute it every
> time used.
>
> I feel a little confused, will spark help me remove RDD1 and put RDD3 in
> the memory?
>
> or is there any concept like " Priority cache " in spark?
>
>
> great thanks
>
>
>
> --
> *----------*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>



-- 
---
Takeshi Yamamuro


Re: sc.textFile the number of the workers to parallelize

2016-02-04 Thread Takeshi Yamamuro
Hi,

ISTM these tasks are just assigned with executors in preferred nodes, so
how about repartitioning rdd?

s3File.repartition(9).count

On Fri, Feb 5, 2016 at 5:04 AM, Lin, Hao <hao@finra.org> wrote:

> Hi,
>
>
>
> I have a question on the number of workers that Spark enable to
> parallelize the loading of files using sc.textFile. When I used sc.textFile
> to access multiple files in AWS S3, it seems to only enable 2 workers
> regardless of how many worker nodes I have in my cluster. So how does Spark
> configure the parallelization in regard of the size of cluster nodes? In
> the following case, spark has 896 tasks split between only two nodes
> 10.162.97.235 and 10.162.97.237, while I have 9 nodes in the cluster.
>
>
>
> thanks
>
>
>
> Example of doing a count:
>
>  scala> s3File.count
>
> 16/02/04 18:12:06 INFO SparkContext: Starting job: count at :30
>
> 16/02/04 18:12:06 INFO DAGScheduler: Got job 0 (count at :30)
> with 896 output partitions
>
> 16/02/04 18:12:06 INFO DAGScheduler: Final stage: ResultStage 0 (count at
> :30)
>
> 16/02/04 18:12:06 INFO DAGScheduler: Parents of final stage: List()
>
> 16/02/04 18:12:06 INFO DAGScheduler: Missing parents: List()
>
> 16/02/04 18:12:06 INFO DAGScheduler: Submitting ResultStage 0
> (MapPartitionsRDD[1] at textFile at :27), which has no missing
> parents
>
> 16/02/04 18:12:07 INFO MemoryStore: Block broadcast_1 stored as values in
> memory (estimated size 3.0 KB, free 228.3 KB)
>
> 16/02/04 18:12:07 INFO MemoryStore: Block broadcast_1_piece0 stored as
> bytes in memory (estimated size 1834.0 B, free 230.1 KB)
>
> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on 10.162.98.112:46425 (size: 1834.0 B, free: 517.4 MB)
>
> 16/02/04 18:12:07 INFO SparkContext: Created broadcast 1 from broadcast at
> DAGScheduler.scala:1006
>
> 16/02/04 18:12:07 INFO DAGScheduler: Submitting 896 missing tasks from
> ResultStage 0 (MapPartitionsRDD[1] at textFile at :27)
>
> 16/02/04 18:12:07 INFO YarnScheduler: Adding task set 0.0 with 896 tasks
>
> 16/02/04 18:12:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 0, 10.162.97.235, partition 0,RACK_LOCAL, 2213 bytes)
>
> 16/02/04 18:12:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 1, 10.162.97.237, partition 1,RACK_LOCAL, 2213 bytes)
>
> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on 10.162.97.235:38643 (size: 1834.0 B, free: 1259.8 MB)
>
> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on 10.162.97.237:45360 (size: 1834.0 B, free: 1259.8 MB)
>
> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> memory on 10.162.97.237:45360 (size: 23.8 KB, free: 1259.8 MB)
>
> 16/02/04 18:12:07 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> memory on 10.162.97.235:38643 (size: 23.8 KB, free: 1259.8 MB)
> Confidentiality Notice:: This email, including attachments, may include
> non-public, proprietary, confidential or legally privileged information. If
> you are not an intended recipient or an authorized agent of an intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of the information contained in or transmitted with this e-mail is
> unauthorized and strictly prohibited. If you have received this email in
> error, please notify the sender by replying to this message and permanently
> delete this e-mail, its attachments, and any copies of it immediately. You
> should not retain, copy or use this e-mail or any attachment for any
> purpose, nor disclose all or any part of the contents to any other person.
> Thank you.
>



-- 
---
Takeshi Yamamuro


Re: Optimal way to re-partition from a single partition

2016-02-09 Thread Takeshi Yamamuro
The issue is not almost solved even in newer Spark.


On Wed, Feb 10, 2016 at 1:36 AM, Cesar Flores <ces...@gmail.com> wrote:

> Well, actually I am observing a single partition no matter what my input
> is. I am using spark 1.3.1.
>
> For what you both are saying, it appears that this sorting issue (going to
> a single partition after applying orderBy in a DF) is solved in later
> version of Spark? Well, if that is the case, I guess I just need to wait
> until my workplace decides to update.
>
>
> Thanks a lot
>
> On Tue, Feb 9, 2016 at 9:39 AM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> DataFrame#sort() uses `RangePartitioning` in `Exchange` instead of
>> `HashPartitioning`.
>> `RangePartitioning` roughly samples input data and internally computes
>> partition bounds
>> to split given rows into `spark.sql.shuffle.partitions` partitions.
>> Therefore, when sort keys are highly skewed, I think some partitions
>> could end up being empty
>> (that is, # of result partitions is lower than `spark.sql.shuffle.partitions`
>> .
>>
>>
>> On Tue, Feb 9, 2016 at 9:35 PM, Hemant Bhanawat <hemant9...@gmail.com>
>> wrote:
>>
>>> For sql shuffle operations like groupby, the number of output partitions
>>> is controlled by spark.sql.shuffle.partitions. But, it seems orderBy does
>>> not honour this.
>>>
>>> In my small test, I could see that the number of partitions  in DF
>>> returned by orderBy was equal to the total number of distinct keys. Are you
>>> observing the same, I mean do you have a single value for all rows in the
>>> column on which you are running orderBy? If yes, you are better off not
>>> running the orderBy clause.
>>>
>>> May be someone from spark sql team could answer that how should the
>>> partitioning of the output DF be handled when doing an orderBy?
>>>
>>> Hemant
>>> www.snappydata.io
>>> https://github.com/SnappyDataInc/snappydata
>>>
>>>
>>>
>>>
>>> On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores <ces...@gmail.com> wrote:
>>>
>>>>
>>>> I have a data frame which I sort using orderBy function. This operation
>>>> causes my data frame to go to a single partition. After using those
>>>> results, I would like to re-partition to a larger number of partitions.
>>>> Currently I am just doing:
>>>>
>>>> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
>>>> partition and around 14 million records
>>>> val newDF =  hc.createDataFrame(rdd, df.schema)
>>>>
>>>> This process is really slow. Is there any other way of achieving this
>>>> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>>>>
>>>>
>>>> Thanks a lot
>>>> --
>>>> Cesar Flores
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>
>
> --
> Cesar Flores
>



-- 
---
Takeshi Yamamuro


Re: Unpersist RDD in Graphx

2016-02-01 Thread Takeshi Yamamuro
Hi,

Please call "Graph#unpersist" that releases two RDDs, vertex and edge ones.
"Graph#unpersist"  just invokes "Graph#unpersistVertices" and
"Graph#edges#unpersist";
"Graph#unpersistVertices" releases memory for vertices and
"Graph#edges#unpersist"
does memory for edges.
If blocking = true,  unpersist() waits until memory released from
BlockManager.



On Mon, Feb 1, 2016 at 8:35 AM, Zhang, Jingyu <jingyu.zh...@news.com.au>
wrote:

> Hi, What is he best way to unpersist the RDD in graphx to release memory?
> RDD.unpersist
> or
> RDD.unpersistVertices and RDD..edges.unpersist
>
> I study the source code of Pregel.scala, Both of above were used between
> line 148 and line 150. Can anyone please tell me what the different? In
> addition, what is the difference to set blocking = false and blocking =
> true?
>
> oldMessages.unpersist(blocking = false)
> prevG.unpersistVertices(blocking = false)
> prevG.edges.unpersist(blocking = false)
>
> Thanks,
>
> Jingyu
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.




-- 
---
Takeshi Yamamuro


Re: Guidelines for writing SPARK packages

2016-02-03 Thread Takeshi Yamamuro
Hi,

A package I maintain (https://github.com/maropu/hivemall-spark) extends
existing SparkSQL/DataFrame classes for a third-party library.
Please use this as a concrete example.

Thanks,
takeshi

On Tue, Feb 2, 2016 at 6:20 PM, Praveen Devarao <praveen...@in.ibm.com>
wrote:

> Thanks David.
>
> I am looking at extending the SparkSQL library with a custom
> package...hence was looking at more from details on any specific classes to
> be extended or implement (with) to achieve the redirect of calls to my
> module (when using .format).
>
> If you have any info on these lines do share with me...else debugging
> through would be the way :-)
>
> Thanking You
>
> Praveen Devarao
>
>
>
> From:David Russell <themarchoffo...@gmail.com>
> To:Praveen Devarao/India/IBM@IBMIN
> Cc:user <user@spark.apache.org>
> Date:01/02/2016 07:03 pm
> Subject:Re: Guidelines for writing SPARK packages
> Sent by:marchoffo...@gmail.com
> --
>
>
>
> Hi Praveen,
>
> The basic requirements for releasing a Spark package on
> spark-packages.org are as follows:
>
> 1. The package content must be hosted by GitHub in a public repo under
> the owner's account.
> 2. The repo name must match the package name.
> 3. The master branch of the repo must contain "README.md" and "LICENSE".
>
> Per the doc on spark-packages.org site an example package that meets
> those requirements can be found at
> https://github.com/databricks/spark-avro. My own recently released
> SAMBA package also meets these requirements:
> https://github.com/onetapbeyond/lambda-spark-executor.
>
> As you can see there is nothing in this list of requirements that
> demands the implementation of specific interfaces. What you'll need to
> implement will depend entirely on what you want to accomplish. If you
> want to register a release for your package you will also need to push
> the artifacts for your package to Maven central.
>
> David
>
>
> On Mon, Feb 1, 2016 at 7:03 AM, Praveen Devarao <praveen...@in.ibm.com>
> wrote:
> > Hi,
> >
> > Is there any guidelines or specs to write a Spark package? I
> would
> > like to implement a spark package and would like to know the way it
> needs to
> > be structured (implement some interfaces etc) so that it can plug into
> Spark
> > for extended functionality.
> >
> > Could any one help me point to docs or links on the above?
> >
> > Thanking You
> >
> > Praveen Devarao
>
>
>
> --
> "All that is gold does not glitter, Not all those who wander are lost."
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>


-- 
---
Takeshi Yamamuro


Re: Re: About cache table performance in spark sql

2016-02-04 Thread Takeshi Yamamuro
Hi,

Parquet data are column-wise and highly compressed, so the size of
deserialized rows in spark
could be bigger than that of parquet data on disk.
That is, I think that  24.59GB of parquet data becomes (18.1GB + 23.6GB)
data in spark.

Yes, you know cached data in spark also are compressed by default though,
spark uses simpler compression algorithms than parquet does and
ISTM the compression ratios are typically worse than those of parquet.


On Thu, Feb 4, 2016 at 3:16 PM, fightf...@163.com <fightf...@163.com> wrote:

> Hi,
> Thanks a lot for your explaination. I know that the slow process mainly
> caused by GC pressure and I had understand this difference
> just from your advice.
>
> I had each executor memory with 6GB and try to cache table.
> I had 3 executors and finally I can see some info from the spark job ui
> storage, like the following:
>
>
> RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory
> Size in ExternalBlockStore Size on Disk
> In-memory table video1203 Memory Deserialized 1x Replicated 251 100%
> 18.1 GB 0.0 B 23.6 GB
>
> I can see that spark sql try to cache data into memory. And when I ran the
> following queries over this table video1203, I can get
> fast response. Another thing that confused me is that the above data size
> (in memory and on Disk). I can see that the in memory
> data size is 18.1GB, which almost equals sum of my executor memory. But
> why the Disk size if 23.6GB? From impala I get the overall
> parquet file size if about 24.59GB. Would be good to had some correction
> on this.
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>
>
> *From:* Prabhu Joseph <prabhujose.ga...@gmail.com>
> *Date:* 2016-02-04 14:35
> *To:* fightf...@163.com
> *CC:* user <user@spark.apache.org>
> *Subject:* Re: About cache table performance in spark sql
> Sun,
>
>When Executor don't have enough memory and if it tries to cache the
> data, it spends lot of time on GC and hence the job will be slow. Either,
>
>  1. We should allocate enough memory to cache all RDD and hence the
> job will complete fast
> Or 2. Don't use cache when there is not enough Executor memory.
>
>   To check the GC time, use  --conf
> "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps" while submitting the job and SPARK_WORKER_DIR will
> have sysout with GC.
> The sysout will show many "Full GC" happening when cache is used and
> executor does not have enough heap.
>
>
> Thanks,
> Prabhu Joseph
>
> On Thu, Feb 4, 2016 at 11:25 AM, fightf...@163.com <fightf...@163.com>
> wrote:
>
>> Hi,
>>
>> I want to make sure that the cache table indeed would accelerate sql
>> queries. Here is one of my use case :
>>   impala table size : 24.59GB,  no partitions, with about 1 billion+ rows.
>> I use sqlContext.sql to run queries over this table and try to do cache
>> and uncache command to see if there
>> is any performance disparity. I ran the following query :
>> select * from video1203 where id > 10 and id < 20 and added_year != 1989
>> I can see the following results :
>>
>> 1  If I did not run cache table and just ran sqlContext.sql(), I can see
>> the above query run about 25 seconds.
>> 2  If I firstly run sqlContext.cacheTable("video1203"), the query runs
>> super slow and would cause driver OOM exception, but I can
>> get final results with about running 9 minuts.
>>
>> Would any expert can explain this for me ? I can see that cacheTable
>> cause OOM just because the in-memory columnar storage
>> cannot hold the 24.59GB+ table size into memory. But why the performance
>> is so different and even so bad ?
>>
>> Best,
>> Sun.
>>
>> --
>> fightf...@163.com
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Getting the size of a broadcast variable

2016-02-01 Thread Takeshi Yamamuro
Hi,

Currently, there is no way to check the size except for snooping INFO-logs
in a driver;

16/02/02 14:51:53 INFO BlockManagerInfo: Added rdd_2_12 in memory on
localhost:58536 (size: 40.0 B, free: 510.7 MB)



On Tue, Feb 2, 2016 at 8:20 AM, apu mishra . rr <apumishra...@gmail.com>
wrote:

> How can I determine the size (in bytes) of a broadcast variable? Do I need
> to use the .dump method and then look at the size of the result, or is
> there an easier way?
>
> Using PySpark with Spark 1.6.
>
> Thanks!
>
> Apu
>



-- 
---
Takeshi Yamamuro


Re: spark.local.dir configuration

2016-02-24 Thread Takeshi Yamamuro
Hi,

No, there is no way to change local dir paths after Worker initialized.
That is, dir paths are cached when a first executor is launched, then
following executors reference the paths.
Details can be found in codes below;
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L449
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L679


On Wed, Feb 24, 2016 at 5:13 PM, Jung <jb_j...@naver.com> wrote:

> Hi all,
> In the standalone mode, spark.local.dir is ignored after Spark Worker
> launched.
> This is a scenario assuming that we have 1 master node and 1 worker node.
> 1. $SPARK_HOME/sbin/start-all.sh to launch Spark Master
> 2. Modify worker node configuration($SPARK_HOME/conf/spark-defaults.conf)
> spark.local.dir to another directory( /tmp_new).
> 3. run spark-shell from master node.
>
> Executor in this scenario creates scratch directory like
> "spark-bb0876f2-7fa9-4f15-b790-24252183a4f1" under /tmp not /tmp_new.
> Because worker set immutable SparkConf instance at the first time it
> launched and refer to this variable when create new executor which wants to
> change its scratch dir.
> Can I change application's spark.local.dir without restarting spark
> workers?
>
> Thanks,
> Jung




-- 
---
Takeshi Yamamuro


Re: DirectFileOutputCommiter

2016-02-25 Thread Takeshi Yamamuro
Hi,

Great work!
What is the concrete performance gain of the committer on s3?
I'd like to know.

I think there is no direct committer for files because these kinds of
committer has risks
to loss data (See: SPARK-10063).
Until this resolved, ISTM files cannot support direct commits.

thanks,



On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <teng...@gmail.com> wrote:

> yes, should be this one
> https://gist.github.com/aarondav/c513916e72101bbe14ec
>
> then need to set it in spark-defaults.conf :
> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>
> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
> > The header of DirectOutputCommitter.scala says Databricks.
> > Did you get it from Databricks ?
> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <teng...@gmail.com> wrote:
> >>
> >> interesting in this topic as well, why the DirectFileOutputCommitter
> not included?
> >> we added it in our fork,
> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
> >> moreover, this DirectFileOutputCommitter is not working for the insert
> operations in HiveContext, since the Committer is called by hive (means
> uses dependencies in hive package)
> >> we made some hack to fix this, you can take a look:
> >>
> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
> >>
> >> may bring some ideas to other spark contributors to find a better way
> to use s3.
> >>
> >> 2016-02-22 23:18 GMT+01:00 igor.berman <igor.ber...@gmail.com>:
> >>>
> >>> Hi,
> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
> alikes
> >>> especially when working with s3?
> >>> I know that there is one impl in spark distro for parquet format, but
> not
> >>> for files -  why?
> >>>
> >>> Imho, it can bring huge performance boost.
> >>> Using default FileOutputCommiter with s3 has big overhead at commit
> stage
> >>> when all parts are copied one-by-one to destination dir from
> _temporary,
> >>> which is bottleneck when number of partitions is high.
> >>>
> >>> Also, wanted to know if there are some problems when using
> >>> DirectFileOutputCommitter?
> >>> If writing one partition directly will fail in the middle is spark will
> >>> notice this and will fail job(say after all retries)?
> >>>
> >>> thanks in advance
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
> >>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>
> >>> -
> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >>> For additional commands, e-mail: user-h...@spark.apache.org
> >>>
> >>
> >
> >
>



-- 
---
Takeshi Yamamuro


Re: Bug in DiskBlockManager subDirs logic?

2016-02-25 Thread Takeshi Yamamuro
Hi,

Could you make simple codes to reproduce the issue?
I'm not exactly sure why shuffle data on temp dir. are wrongly deleted.

thanks,



On Fri, Feb 26, 2016 at 6:00 AM, Zee Chen <zeo...@gmail.com> wrote:

> Hi,
>
> I am debugging a situation where SortShuffleWriter sometimes fail to
> create a file, with the following stack trace:
>
> 16/02/23 11:48:46 ERROR Executor: Exception in task 13.0 in stage
> 47827.0 (TID 1367089)
> java.io.FileNotFoundException:
>
> /tmp/spark-9dd8dca9-6803-4c6c-bb6a-0e9c0111837c/executor-129dfdb8-9422-4668-989e-e789703526ad/blockmgr-dda6e340-7859-468f-b493-04e4162d341a/00/temp_shuffle_69fe1673-9ff2-462b-92b8-683d04669aad
> (No such file or directory)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.(FileOutputStream.java:213)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:110)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> I checked the linux file system (ext4) and saw the /00/ subdir is
> missing. I went through the heap dump of the
> CoarseGrainedExecutorBackend jvm proc and found that
> DiskBlockManager's subDirs list had more non-null 2-hex subdirs than
> present on the file system! As a test I created all 64 2-hex subdirs
> by hand and then the problem went away.
>
> So had anybody else seen this problem? Looking at the relevant logic
> in DiskBlockManager and it hasn't changed much since the fix to
> https://issues.apache.org/jira/browse/SPARK-6468
>
> My configuration:
> spark-1.5.1, hadoop-2.6.0, standalone, oracle jdk8u60
>
> Thanks,
> Zee
>
> -----
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: merge join already sorted data?

2016-02-25 Thread Takeshi Yamamuro
Hi,

SparkSQL inside can put order assumptions on columns (OrderedDistribution)
though,
JDBC datasources does not support this; spark is not sure how columns
loaded from databases are ordered.
Also, there is no way to let spark know this order.

thanks,



On Fri, Feb 26, 2016 at 2:22 PM, Ken Geis <geis@gmail.com> wrote:

> I am loading data from two different databases and joining it in Spark.
> The data is indexed in the database, so it is efficient to retrieve the
> data ordered by a key. Can I tell Spark that my data is coming in ordered
> on that key so that when I join the data sets, they will be joined with
> little shuffling via a merge join?
>
> I know that Flink supports this, but its JDBC support is pretty lacking in
> general.
>
>
> Thanks,
>
> Ken
>
>


-- 
---
Takeshi Yamamuro


Re: DirectFileOutputCommiter

2016-02-29 Thread Takeshi Yamamuro
Hi,

I think the essential culprit is that these committers are not idempotent;
retry attempts will fail.
See codes below for details;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala#L130

On Sat, Feb 27, 2016 at 7:38 PM, Igor Berman <igor.ber...@gmail.com> wrote:

> Hi Reynold,
> thanks for the response
> Yes, speculation mode needs some coordination.
> Regarding job failure :
> correct me if I wrong - if one of jobs fails - client code will be sort of
> "notified" by exception or something similar, so the client can decide to
> re-submit action(job), i.e. it won't be "silent" failure.
>
>
> On 26 February 2016 at 11:50, Reynold Xin <r...@databricks.com> wrote:
>
>> It could lose data in speculation mode, or if any job fails.
>>
>> On Fri, Feb 26, 2016 at 3:45 AM, Igor Berman <igor.ber...@gmail.com>
>> wrote:
>>
>>> Takeshi, do you know the reason why they wanted to remove this commiter
>>> in SPARK-10063?
>>> the jira has no info inside
>>> as far as I understand the direct committer can't be used when either of
>>> two is true
>>> 1. speculation mode
>>> 2. append mode(ie. not creating new version of data but appending to
>>> existing data)
>>>
>>> On 26 February 2016 at 08:24, Takeshi Yamamuro <linguin@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Great work!
>>>> What is the concrete performance gain of the committer on s3?
>>>> I'd like to know.
>>>>
>>>> I think there is no direct committer for files because these kinds of
>>>> committer has risks
>>>> to loss data (See: SPARK-10063).
>>>> Until this resolved, ISTM files cannot support direct commits.
>>>>
>>>> thanks,
>>>>
>>>>
>>>>
>>>> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <teng...@gmail.com> wrote:
>>>>
>>>>> yes, should be this one
>>>>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>>>>
>>>>> then need to set it in spark-defaults.conf :
>>>>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>>>>
>>>>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>>>>> > The header of DirectOutputCommitter.scala says Databricks.
>>>>> > Did you get it from Databricks ?
>>>>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <teng...@gmail.com> wrote:
>>>>> >>
>>>>> >> interesting in this topic as well, why
>>>>> the DirectFileOutputCommitter not included?
>>>>> >> we added it in our fork,
>>>>> under 
>>>>> core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>>>>> >> moreover, this DirectFileOutputCommitter is not working for the
>>>>> insert operations in HiveContext, since the Committer is called by hive
>>>>> (means uses dependencies in hive package)
>>>>> >> we made some hack to fix this, you can take a look:
>>>>> >>
>>>>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>>>>> >>
>>>>> >> may bring some ideas to other spark contributors to find a better
>>>>> way to use s3.
>>>>> >>
>>>>> >> 2016-02-22 23:18 GMT+01:00 igor.berman <igor.ber...@gmail.com>:
>>>>> >>>
>>>>> >>> Hi,
>>>>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>>>>> alikes
>>>>> >>> especially when working with s3?
>>>>> >>> I know that there is one impl in spark distro for parquet format,
>>>>> but not
>>>>> >>> for files -  why?
>>>>> >>>
>>>>> >>> Imho, it can bring huge performance boost.
>>>>> >>> Using default FileOutputCommiter with s3 has big overhead at
>>>>> commit stage
>>>>> >>> when all parts are copied one-by-one to destination dir from
>>>>> _temporary,
>>>>> >>> which is bottleneck when number of partitions is high.
>>>>> >>>
>>>>> >>> Also, wanted to know if there are some problems when using
>>>>> >>> DirectFileOutputCommitter?
>>>>> >>> If writing one partition directly will fail in the middle is spark
>>>>> will
>>>>> >>> notice this and will fail job(say after all retries)?
>>>>> >>>
>>>>> >>> thanks in advance
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> --
>>>>> >>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>>>>> >>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>> >>>
>>>>> >>>
>>>>> -
>>>>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>> >>>
>>>>> >>
>>>>> >
>>>>> >
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Number partitions after a join

2016-02-25 Thread Takeshi Yamamuro
Hi,

The number depends on `spark.sql.shuffle.partitions`.
See:
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options

On Thu, Feb 25, 2016 at 7:42 PM, Guillermo Ortiz <konstt2...@gmail.com>
wrote:

> When you do a join in Spark, how many partitions are as result? is it a
> default number if you don't specify the number of partitions?
>



-- 
---
Takeshi Yamamuro


Re: Force Partitioner to use entire entry of PairRDD as key

2016-02-22 Thread Takeshi Yamamuro
You're correct, reduceByKey is just an example.

On Tue, Feb 23, 2016 at 10:57 AM, Jay Luan <jaylu...@gmail.com> wrote:

> Could you elaborate on how this would work?
>
> So from what I can tell, this maps a key to a tuple which always has a 0
> as the second element. From there the hash widely changes because we now
> hash something like ((1,4), 0) and ((1,3), 0). Thus mapping this would
> create more even partitions. Why reduce by key after? Is that just an
> example of an operation that can be done? Or does it provide some kind of
> real value to the operation.
>
>
>
> On Mon, Feb 22, 2016 at 5:48 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> How about adding dummy values?
>> values.map(d => (d, 0)).reduceByKey(_ + _)
>>
>> On Tue, Feb 23, 2016 at 10:15 AM, jluan <jaylu...@gmail.com> wrote:
>>
>>> I was wondering, is there a way to force something like the hash
>>> partitioner
>>> to use the entire entry of a PairRDD as a hash rather than just the key?
>>>
>>> For Example, if we have an RDD with values: PairRDD = [(1,4), (1, 3), (2,
>>> 3), (2,5), (2, 10)]. Rather than using keys 1 and 2, can we force the
>>> partitioner to hash the entire tuple such as (1,4)?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Force-Partitioner-to-use-entire-entry-of-PairRDD-as-key-tp26299.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Force Partitioner to use entire entry of PairRDD as key

2016-02-22 Thread Takeshi Yamamuro
Hi,

How about adding dummy values?
values.map(d => (d, 0)).reduceByKey(_ + _)

On Tue, Feb 23, 2016 at 10:15 AM, jluan <jaylu...@gmail.com> wrote:

> I was wondering, is there a way to force something like the hash
> partitioner
> to use the entire entry of a PairRDD as a hash rather than just the key?
>
> For Example, if we have an RDD with values: PairRDD = [(1,4), (1, 3), (2,
> 3), (2,5), (2, 10)]. Rather than using keys 1 and 2, can we force the
> partitioner to hash the entire tuple such as (1,4)?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Force-Partitioner-to-use-entire-entry-of-PairRDD-as-key-tp26299.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Spark SQL Optimization

2016-03-23 Thread Takeshi Yamamuro
Hi, all

What's the size of three tables? Also, what's the performance difference of
the two queries?

On Tue, Mar 22, 2016 at 3:53 PM, Rishi Mishra <rmis...@snappydata.io> wrote:

> What we have observed so far is Spark picks join order in the same order
> as tables in from clause is specified.  Sometimes reordering benefits the
> join query.
> This can be an inbuilt optimization in Spark. But again its not going to
> be straight forward, where rather than table size,  selectivity of Join is
> important.
> Probably some kind of heuristic might help.
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Mon, Mar 21, 2016 at 11:18 PM, gtinside <gtins...@gmail.com> wrote:
>
>> More details :
>>
>> Execution plan for Original query
>> select distinct pge.portfolio_code
>> from table1 pge join table2 p
>> on p.perm_group = pge.anc_port_group
>> join table3 uge
>> on p.user_group=uge.anc_user_group
>> where uge.user_name = 'user' and p.perm_type = 'TEST'
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[portfolio_code#14119], functions=[],
>> output=[portfolio_code#14119])
>>  TungstenExchange hashpartitioning(portfolio_code#14119)
>>   TungstenAggregate(key=[portfolio_code#14119], functions=[],
>> output=[portfolio_code#14119])
>>TungstenProject [portfolio_code#14119]
>> BroadcastHashJoin [user_group#13665], [anc_user_group#13658],
>> BuildRight
>>  TungstenProject [portfolio_code#14119,user_group#13665]
>>   BroadcastHashJoin [anc_port_group#14117], [perm_group#13667],
>> BuildRight
>>ConvertToUnsafe
>> Scan
>>
>> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117]
>>ConvertToUnsafe
>> Project [user_group#13665,perm_group#13667]
>>  Filter (perm_type#13666 = TEST)
>>   Scan
>>
>> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][user_group#13665,perm_group#13667,perm_type#13666]
>>  ConvertToUnsafe
>>   Project [anc_user_group#13658]
>>Filter (user_name#13659 = user)
>> Scan
>>
>> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659]
>>
>>
>>
>> Execution plan for optimized query
>> select distinct pge.portfolio_code
>> from table1 uge, table2 p, table3 pge
>> where uge.user_name = 'user' and p.perm_type = 'TEST'
>> and p.perm_group = pge.anc_port_group
>> and p.user_group=uge.anc_user_group
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[portfolio_code#14119], functions=[],
>> output=[portfolio_code#14119])
>>  TungstenExchange hashpartitioning(portfolio_code#14119)
>>   TungstenAggregate(key=[portfolio_code#14119], functions=[],
>> output=[portfolio_code#14119])
>>TungstenProject [portfolio_code#14119]
>> BroadcastHashJoin [perm_group#13667], [anc_port_group#14117],
>> BuildRight
>>  TungstenProject [perm_group#13667]
>>   BroadcastHashJoin [anc_user_group#13658], [user_group#13665],
>> BuildRight
>>ConvertToUnsafe
>> Project [anc_user_group#13658]
>>  Filter (user_name#13659 = user)
>>   Scan
>>
>> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659]
>>ConvertToUnsafe
>> Project [perm_group#13667,user_group#13665]
>>  Filter (perm_type#13666 = TEST)
>>   Scan
>>
>> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][perm_group#13667,user_group#13665,perm_type#13666]
>>  ConvertToUnsafe
>>   Scan
>>
>> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117]
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Optimization-tp26548p26553.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Forcing data from disk to memory

2016-03-25 Thread Takeshi Yamamuro
I'm not 100% sure what you wanna do though, how about caching whole data
and then querying?
yourRdd.cache.foreach(_)

On Fri, Mar 25, 2016 at 12:22 AM, Daniel Imberman <daniel.imber...@gmail.com
> wrote:

> Hi Takeshi,
>
> Thank you for getting back to me. If this is not possible then perhaps you
> can help me with the root problem that caused me to ask this question.
>
> Basically I have a job where I'm loading/persisting an RDD and running
> queries against it. The problem I'm having is that even though there is
> plenty of space in memory, the RDD is not fully persisting. Once I run
> multiple queries against it the RDD fully persists, but this means that the
> first 4/5 queries I run are extremely slow.
>
> Is there any way I can make sure that the entire RDD ends up in memory the
> first time I load it?
>
> Thank you
>
> On Thu, Mar 24, 2016 at 1:21 AM Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> just re-sent,
>>
>>
>> -- Forwarded message --
>> From: Takeshi Yamamuro <linguin@gmail.com>
>> Date: Thu, Mar 24, 2016 at 5:19 PM
>> Subject: Re: Forcing data from disk to memory
>> To: Daniel Imberman <daniel.imber...@gmail.com>
>>
>>
>> Hi,
>>
>> We have no direct approach; we need to unpersist cached data, then
>> re-cache data as MEMORY_ONLY.
>>
>> // maropu
>>
>> On Thu, Mar 24, 2016 at 8:22 AM, Daniel Imberman <
>> daniel.imber...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> So I have a question about persistence. Let's say I have an RDD that's
>>> persisted MEMORY_AND_DISK, and I know that I now have enough memory space
>>> cleared up that I can force the data on disk into memory. Is it possible
>>> to
>>> tell spark to re-evaluate the open RDD memory and move that information?
>>>
>>> Thank you
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-data-from-disk-to-memory-tp26585.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


-- 
---
Takeshi Yamamuro


Re: Does SparkSql has official jdbc/odbc driver?

2016-03-25 Thread Takeshi Yamamuro
Hi,

No, you need to use the third-party's ones.

// maropu

On Fri, Mar 25, 2016 at 3:33 PM, sage <lkke...@gmail.com> wrote:

> Hi all,
> Does SparkSql has official jdbc/odbc driver? I only saw third-party's
> jdbc/odbc driver.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-SparkSql-has-official-jdbc-odbc-driver-tp26591.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Spark and DB connection pool

2016-03-23 Thread Takeshi Yamamuro
Hi,

Currently, Spark itself doesn't pool JDBC connections.
If you face performance difficulty, all you can do is to cache loaded data
from RDB and Cassandra in Spark.

thanks,
maropu

On Wed, Mar 23, 2016 at 11:56 PM, rjtokenring <ing.marco.colo...@gmail.com>
wrote:

> Hi all, is there a way in spark to setup a connection pool?
> As example: I'm going to use a relational DB and Cassandra to join data
> between them.
> How can I control and cache DB connections?
>
> Thanks all!
>
> Mark
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-DB-connection-pool-tp26577.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Best way to determine # of workers

2016-03-24 Thread Takeshi Yamamuro
Hi,

There is no way to get such information from your app.
Why do you need that?

thanks,
maropu

On Thu, Mar 24, 2016 at 8:23 AM, Ajaxx <ajack...@pobox.com> wrote:

> I'm building some elasticity into my model and I'd like to know when my
> workers have come online.  It appears at present that the API only supports
> getting information about applications.  Is there a good way to determine
> how many workers are available?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-determine-of-workers-tp26586.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Fwd: Forcing data from disk to memory

2016-03-24 Thread Takeshi Yamamuro
just re-sent,


-- Forwarded message --
From: Takeshi Yamamuro <linguin@gmail.com>
Date: Thu, Mar 24, 2016 at 5:19 PM
Subject: Re: Forcing data from disk to memory
To: Daniel Imberman <daniel.imber...@gmail.com>


Hi,

We have no direct approach; we need to unpersist cached data, then
re-cache data as MEMORY_ONLY.

// maropu

On Thu, Mar 24, 2016 at 8:22 AM, Daniel Imberman <daniel.imber...@gmail.com>
wrote:

> Hi all,
>
> So I have a question about persistence. Let's say I have an RDD that's
> persisted MEMORY_AND_DISK, and I know that I now have enough memory space
> cleared up that I can force the data on disk into memory. Is it possible to
> tell spark to re-evaluate the open RDD memory and move that information?
>
> Thank you
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-data-from-disk-to-memory-tp26585.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro



-- 
---
Takeshi Yamamuro


Re: OOM exception during Broadcast

2016-03-07 Thread Takeshi Yamamuro
Oh,  How about increasing broadcast block size
in spark.broadcast.blockSize?
A default size is `4m` and it is too small agains ~1GB, I think.

On Tue, Mar 8, 2016 at 10:44 AM, Arash <aras...@gmail.com> wrote:

> Hi Tristan,
>
> This is not static, I actually collect it from an RDD to the driver.
>
> On Mon, Mar 7, 2016 at 5:42 PM, Tristan Nixon <st...@memeticlabs.org>
> wrote:
>
>> Hi Arash,
>>
>> is this static data?  Have you considered including it in your jars and
>> de-serializing it from jar on each worker node?
>> It’s not pretty, but it’s a workaround for serialization troubles.
>>
>> On Mar 7, 2016, at 5:29 PM, Arash <aras...@gmail.com> wrote:
>>
>> Hello all,
>>
>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes
>> but haven't been able to make it work so far.
>>
>> It looks like the executors start to run out of memory during
>> deserialization. This behavior only shows itself when the number of
>> partitions is above a few 10s, the broadcast does work for 10 or 20
>> partitions.
>>
>> I'm using the following setup to observe the problem:
>>
>> val tuples: Array[((String, String), (String, String))]  // ~ 10M
>> tuples
>> val tuplesBc = sc.broadcast(tuples)
>> val numsRdd = sc.parallelize(1 to 5000, 100)
>> numsRdd.map(n => tuplesBc.value.head).count()
>>
>> If I set the number of partitions for numsRDD to 20, the count goes
>> through successfully, but at 100, I'll start to get errors such as:
>>
>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage
>> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap
>> space
>> at
>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>> at
>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at
>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>
>>
>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
>> property maximizeResourceAllocation is set to true (executor.memory = 48G
>> according to spark ui environment). We're also using kryo serialization and
>> Yarn is the resource manager.
>>
>> Any ideas as what might be going wrong and how to debug this?
>>
>> Thanks,
>> Arash
>>
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Spark reduce serialization question

2016-03-06 Thread Takeshi Yamamuro
Hi,

I'm not exactly sure what's your codes like though, ISTM this is a correct
behaviour.
If the size of data that a driver fetches exceeds the limit, the driver
throws this exception.
(See
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L68
)
So, in your case, your driver tries to fetch 1345.5 MB data of 4 models
from executors, then it fails.
Thanks,

On Sat, Mar 5, 2016 at 6:11 AM, James Jia <james...@berkeley.edu> wrote:

> I'm running a distributed KMeans algorithm with 4 executors.
>
> I have a RDD[Data]. I use mapPartition to run a learner on each data 
> partition, and then call reduce with my custom model reduce function to 
> reduce the result of the model to start a new iteration.
>
> The model size is around ~330 MB. I would expect that the required memory for 
> the serialized result at the driver to be at most 2*300 MB in order to reduce 
> two models, but it looks like Spark is serializing all of the models to the 
> driver before reducing.
>
> The error message says that the total size of the serialized results is 
> 1345.5MB, which is approximately 4 * 330 MB. I know I can set the driver's 
> max result size, but I just want to confirm that this is expected behavior.
>
> Thanks!
>
> James
>
> Stage 0:==>(1 + 3) / 
> 4]16/02/19 05:59:28 ERROR TaskSetManager: Total size of serialized results of 
> 4 tasks (1345.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 4 tasks (1345.5 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
>
>   at org.apache.spark.scheduler.DAGScheduler.org 
> <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
>
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>
>   at scala.Option.foreach(Option.scala:257)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
>
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
>
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
>
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
>
>   at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1007)
>
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>
>   at org.apache.spark.rdd.RDD.reduce(RDD.scala:989)
>
>   at BIDMach.RunOnSpark$.runOnSpark(RunOnSpark.scala:50)
>
>   ... 50 elided
>
>


-- 
---
Takeshi Yamamuro


Re: data frame problem preserving sort order with repartition() and coalesce()

2016-03-30 Thread Takeshi Yamamuro
Hi,

"csvDF = csvDF.sort(orderByColName, ascending=False)" repartitions DF by
using RangePartitioner
(#partitions depends on "spark.sql.shuffle.partitions").
Seems, in your case, some empty partitions were removed, then you got 17
paritions.

// maropu

On Wed, Mar 30, 2016 at 6:49 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I have a requirement to write my results out into a series of CSV files.
> No file may have more than 100 rows of data. In the past my data was not
> sorted, and I was able to use reparation() or coalesce() to ensure the
> file length requirement.
>
> I realize that reparation() cause the data to be shuffled. It appears that
> changes the data ordering. So I sort the repartioned data again.
>
> What is really strange is I no longer get the number of output files I am
> expecting, and the number of lines constraint is not violated
>
> I am using spark-1.6.1
>
> Andy
>
> $ for i in topTags_CSV/*.csv; do wc -l $i; done
>
>   19 topTags_CSV/part-0.csv
>
>   19 topTags_CSV/part-1.csv
>
>   20 topTags_CSV/part-2.csv
>
>   19 topTags_CSV/part-3.csv
>
>   22 topTags_CSV/part-4.csv
>
>   19 topTags_CSV/part-5.csv
>
>   26 topTags_CSV/part-6.csv
>
>   18 topTags_CSV/part-7.csv
>
>   12 topTags_CSV/part-8.csv
>
>   25 topTags_CSV/part-9.csv
>
>   32 topTags_CSV/part-00010.csv
>
>   53 topTags_CSV/part-00011.csv
>
>   89 topTags_CSV/part-00012.csv
>
>  146 topTags_CSV/part-00013.csv
>
>  387 topTags_CSV/part-00014.csv
>
> 2708 topTags_CSV/part-00015.csv
>
>1 topTags_CSV/part-00016.csv
>
> $
>
> numRowsPerCSVFile = 100
>
> numRows = resultDF.count()
>
> quotient, remander = divmod(numRows, numRowsPerCSVFile)
>
> numPartitions = (quotient + 1) if remander > 0 else quotient
>
> ​
>
> debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})"
>
> .format(numRows, quotient, remander, numPartitions))
>
> print(debugStr)
>
> ​
>
> csvDF = resultDF.coalesce(numPartitions)
>
> ​
>
> orderByColName = "count"
>
> csvDF = csvDF.sort(orderByColName, ascending=False)
>
> headerArg = 'true'# if headers else 'false'
>
> csvDF.write.save(outputDir, 'com.databricks.spark.csv', header=headerArg)
>
> renamePartFiles(outputDir)
>
> numRows:3598 quotient:35 remander:98 repartition(36)
>
>
>
>
>


-- 
---
Takeshi Yamamuro


Re: Cant join same dataframe twice ?

2016-04-26 Thread Takeshi Yamamuro
Based on my example, how about renaming columns?

val df1 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
val df2 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
val df3 = df1.join(df2, "a").select($"a", df1("b").as("1-b"),
df2("b").as("2-b"))
val df4 = df3.join(df2, df3("2-b") === df2("b"))

// maropu

On Wed, Apr 27, 2016 at 1:58 PM, Divya Gehlot <divya.htco...@gmail.com>
wrote:

> Correct Takeshi
> Even I am facing the same issue .
>
> How to avoid the ambiguity ?
>
>
> On 27 April 2016 at 11:54, Takeshi Yamamuro <linguin@gmail.com> wrote:
>
>> Hi,
>>
>> I tried;
>> val df1 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
>> val df2 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
>> val df3 = df1.join(df2, "a")
>> val df4 = df3.join(df2, "b")
>>
>> And I got; org.apache.spark.sql.AnalysisException: Reference 'b' is
>> ambiguous, could be: b#6, b#14.;
>> If same case, this message makes sense and this is clear.
>>
>> Thought?
>>
>> // maropu
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Apr 27, 2016 at 6:09 AM, Prasad Ravilla <pras...@slalom.com>
>> wrote:
>>
>>> Also, check the column names of df1 ( after joining df2 and df3 ).
>>>
>>> Prasad.
>>>
>>> From: Ted Yu
>>> Date: Monday, April 25, 2016 at 8:35 PM
>>> To: Divya Gehlot
>>> Cc: "user @spark"
>>> Subject: Re: Cant join same dataframe twice ?
>>>
>>> Can you show us the structure of df2 and df3 ?
>>>
>>> Thanks
>>>
>>> On Mon, Apr 25, 2016 at 8:23 PM, Divya Gehlot <divya.htco...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I am using Spark 1.5.2 .
>>>> I have a use case where I need to join the same dataframe twice on two
>>>> different columns.
>>>> I am getting error missing Columns
>>>>
>>>> For instance ,
>>>> val df1 = df2.join(df3,"Column1")
>>>> Below throwing error missing columns
>>>> val df 4 = df1.join(df3,"Column2")
>>>>
>>>> Is the bug or valid scenario ?
>>>>
>>>>
>>>>
>>>>
>>>> Thanks,
>>>> Divya
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: removing header from csv file

2016-04-26 Thread Takeshi Yamamuro
Hi,

What do u mean "with sqlcontext only"?
You mean you'd like to load csv data as rdd (sparkcontext) or something?

// maropu

On Wed, Apr 27, 2016 at 2:24 PM, Ashutosh Kumar <kmr.ashutos...@gmail.com>
wrote:

> I see there is a library spark-csv which can be used for removing header
> and processing of csv files. But it seems it works with sqlcontext only. Is
> there a way to remove header from csv files without sqlcontext ?
>
> Thanks
> Ashutosh
>



-- 
---
Takeshi Yamamuro


Re: Cant join same dataframe twice ?

2016-04-26 Thread Takeshi Yamamuro
Yeah, I think so. This is a kind of common mistakes.

// maropu

On Wed, Apr 27, 2016 at 1:05 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> The ambiguity came from:
>
> scala> df3.schema
> res0: org.apache.spark.sql.types.StructType =
> StructType(StructField(a,IntegerType,false),
> StructField(b,IntegerType,false), StructField(b,IntegerType,false))
>
> On Tue, Apr 26, 2016 at 8:54 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> I tried;
>> val df1 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
>> val df2 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
>> val df3 = df1.join(df2, "a")
>> val df4 = df3.join(df2, "b")
>>
>> And I got; org.apache.spark.sql.AnalysisException: Reference 'b' is
>> ambiguous, could be: b#6, b#14.;
>> If same case, this message makes sense and this is clear.
>>
>> Thought?
>>
>> // maropu
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Apr 27, 2016 at 6:09 AM, Prasad Ravilla <pras...@slalom.com>
>> wrote:
>>
>>> Also, check the column names of df1 ( after joining df2 and df3 ).
>>>
>>> Prasad.
>>>
>>> From: Ted Yu
>>> Date: Monday, April 25, 2016 at 8:35 PM
>>> To: Divya Gehlot
>>> Cc: "user @spark"
>>> Subject: Re: Cant join same dataframe twice ?
>>>
>>> Can you show us the structure of df2 and df3 ?
>>>
>>> Thanks
>>>
>>> On Mon, Apr 25, 2016 at 8:23 PM, Divya Gehlot <divya.htco...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I am using Spark 1.5.2 .
>>>> I have a use case where I need to join the same dataframe twice on two
>>>> different columns.
>>>> I am getting error missing Columns
>>>>
>>>> For instance ,
>>>> val df1 = df2.join(df3,"Column1")
>>>> Below throwing error missing columns
>>>> val df 4 = df1.join(df3,"Column2")
>>>>
>>>> Is the bug or valid scenario ?
>>>>
>>>>
>>>>
>>>>
>>>> Thanks,
>>>> Divya
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Cant join same dataframe twice ?

2016-04-26 Thread Takeshi Yamamuro
Hi,

I tried;
val df1 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
val df2 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
val df3 = df1.join(df2, "a")
val df4 = df3.join(df2, "b")

And I got; org.apache.spark.sql.AnalysisException: Reference 'b' is
ambiguous, could be: b#6, b#14.;
If same case, this message makes sense and this is clear.

Thought?

// maropu







On Wed, Apr 27, 2016 at 6:09 AM, Prasad Ravilla <pras...@slalom.com> wrote:

> Also, check the column names of df1 ( after joining df2 and df3 ).
>
> Prasad.
>
> From: Ted Yu
> Date: Monday, April 25, 2016 at 8:35 PM
> To: Divya Gehlot
> Cc: "user @spark"
> Subject: Re: Cant join same dataframe twice ?
>
> Can you show us the structure of df2 and df3 ?
>
> Thanks
>
> On Mon, Apr 25, 2016 at 8:23 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Hi,
>> I am using Spark 1.5.2 .
>> I have a use case where I need to join the same dataframe twice on two
>> different columns.
>> I am getting error missing Columns
>>
>> For instance ,
>> val df1 = df2.join(df3,"Column1")
>> Below throwing error missing columns
>> val df 4 = df1.join(df3,"Column2")
>>
>> Is the bug or valid scenario ?
>>
>>
>>
>>
>> Thanks,
>> Divya
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Profiling memory use and access

2016-04-24 Thread Takeshi Yamamuro
Hi,

You can use YourKit to profile workloads and please see:
https://cwiki.apache.org/confluence/display/SPARK/Profiling+Spark+Applications+Using+YourKit

// maropu

On Mon, Apr 25, 2016 at 10:24 AM, Edmon Begoli <ebeg...@gmail.com> wrote:

> I am working on an experimental research into memory use and profiling of
> memory use and allocation by machine learning functions across number of
> popular libraries.
>
> Is there a facility within Spark, and MLlib specifically to track the
> allocation and use of data frames/memory by MLlib?
>
> Please advise.
>
> I will acknowledge any contributions in a paper, or add you as a co-author
> if you have any significant contribution (and if interested).
>
> Thank you,
> Edmon
>



-- 
---
Takeshi Yamamuro


Re: Error joining dataframes

2016-05-18 Thread Takeshi Yamamuro
Ah, yes. `df_join` has the two `id`, so you need to select which id you use;

scala> :paste

// Entering paste mode (ctrl-D to finish)


val df1 = Seq((1, 0), (2, 0)).toDF("id", "A")

val df2 = Seq((2, 0), (3, 0)).toDF("id", "B")

val df3 = df1.join(df2, df1("id") === df2("id"), "outer")

df3.printSchema

df3.select(df1("id")).show


// Exiting paste mode, now interpreting.


root

 |-- id: integer (nullable = true)

 |-- A: integer (nullable = true)

 |-- id: integer (nullable = true)

 |-- B: integer (nullable = true)


++

|  id|

++

|   1|

|   2|

|null|

++



On Wed, May 18, 2016 at 4:29 PM, ram kumar <ramkumarro...@gmail.com> wrote:

> When you register a temp table from the dataframe
>
> eg:
> var df_join = df1.join(df2, df1("id") === df2("id"), "outer")
> df_join.registerTempTable("test")
>
> sqlContext.sql("select * from test")
>
> +++++
>
> |  id|   A|  id|   B|
>
> +++++
>
> |   1|   0|null|null|
>
> |   2|   0|   2|   0|
>
> |null|null|   3|   0|
>
> +++++
>
>
> but, when you query the "id"
>
>
> sqlContext.sql("select id from test")
>
> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>
> On Wed, May 18, 2016 at 12:44 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Look weird, seems spark-v1.5.x can accept the query.
>> What's the difference between the example and your query?
>>
>> 
>>
>> Welcome to
>>
>>     __
>>
>>  / __/__  ___ _/ /__
>>
>> _\ \/ _ \/ _ `/ __/  '_/
>>
>>/___/ .__/\_,_/_/ /_/\_\   version 1.5.2
>>
>>   /_/
>>
>> scala> :paste
>>
>> // Entering paste mode (ctrl-D to finish)
>>
>> val df1 = Seq((1, 0), (2, 0)).toDF("id", "A")
>>
>> val df2 = Seq((2, 0), (3, 0)).toDF("id", "B")
>>
>> df1.join(df2, df1("id") === df2("id"), "outer").show
>>
>>
>> // Exiting paste mode, now interpreting.
>>
>>
>> +++++
>>
>> |  id|   A|  id|   B|
>>
>> +++++
>>
>> |   1|   0|null|null|
>>
>> |   2|   0|   2|   0|
>>
>> |null|null|   3|   0|
>>
>> +++++
>>
>>
>> df1: org.apache.spark.sql.DataFrame = [id: int, A: int]
>>
>> df2: org.apache.spark.sql.DataFrame = [id: int, B: int]
>>
>>
>>
>>
>>
>> On Wed, May 18, 2016 at 3:52 PM, ram kumar <ramkumarro...@gmail.com>
>> wrote:
>>
>>> I tried
>>> df1.join(df2, df1("id") === df2("id"), "outer").show
>>>
>>> But there is a duplicate "id" and when I query the "id", I get
>>> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
>>> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>>>
>>> I am currently using spark 1.5.2.
>>> Is there any alternative way in 1.5
>>>
>>> Thanks
>>>
>>> On Wed, May 18, 2016 at 12:12 PM, Takeshi Yamamuro <
>>> linguin@gmail.com> wrote:
>>>
>>>> Also, you can pass the query that you'd like to use in spark-v1.6+;
>>>>
>>>> val df1 = Seq((1, 0), (2, 0), (3, 0)).toDF("id", "A")
>>>> val df2 = Seq((1, 0), (2, 0), (3, 0)).toDF("id", "B")
>>>> df1.join(df2, df1("id") === df2("id"), "outer").show
>>>>
>>>> // maropu
>>>>
>>>>
>>>> On Wed, May 18, 2016 at 3:29 PM, ram kumar <ramkumarro...@gmail.com>
>>>> wrote:
>>>>
>>>>> If I run as
>>>>> val rs = s.join(t,"time_id").join(c,"channel_id")
>>>>>
>>>>> It takes as inner join.
>>>>>
>>>>>
>>>>> On Wed, May 18, 2016 at 2:31 AM, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> pretty simple, a similar construct to tables projected as DF
>>>>>>
>>>>>> val c =
>>>>>> HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
>

Re: SPARK - DataFrame for BulkLoad

2016-05-18 Thread Takeshi Yamamuro
Hi,

Have you checked this?
http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3ccacyzca3askwd-tujhqi1805bn7sctguaoruhd5xtxcsul1a...@mail.gmail.com%3E

// maropu

On Wed, May 18, 2016 at 1:14 PM, Mohanraj Ragupathiraj <
mohanaug...@gmail.com> wrote:

> I have 100 million records to be inserted to a HBase table (PHOENIX) as a
> result of a Spark Job. I would like to know if i convert it to a Dataframe
> and save it, will it do Bulk load (or) it is not the efficient way to write
> data to a HBase table
>
> --
> Thanks and Regards
> Mohan
>



-- 
---
Takeshi Yamamuro


Re: Error joining dataframes

2016-05-18 Thread Takeshi Yamamuro
You can use the api in spark-v1.6+.
https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L454

// maropu

On Wed, May 18, 2016 at 3:16 PM, ram kumar <ramkumarro...@gmail.com> wrote:

> I tried
>
> scala> var df_join = df1.join(df2, "Id", "fullouter")
> :27: error: type mismatch;
>  found   : String("Id")
>  required: org.apache.spark.sql.Column
>var df_join = df1.join(df2, "Id", "fullouter")
>^
>
> scala>
>
> And I cant see the above method in
>
> https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html#join(org.apache.spark.sql.DataFrame,%20org.apache.spark.sql.Column,%20java.lang.String)
>
> On Wed, May 18, 2016 at 2:22 AM, Bijay Kumar Pathak <bkpat...@mtu.edu>
> wrote:
>
>> Hi,
>>
>> Try this one:
>>
>>
>> df_join = df1.*join*(df2, 'Id', "fullouter")
>>
>> Thanks,
>> Bijay
>>
>>
>> On Tue, May 17, 2016 at 9:39 AM, ram kumar <ramkumarro...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I tried to join two dataframe
>>>
>>> df_join = df1.*join*(df2, ((df1("Id") === df2("Id")), "fullouter")
>>>
>>> df_join.registerTempTable("join_test")
>>>
>>>
>>> When querying "Id" from "join_test"
>>>
>>> 0: jdbc:hive2://> *select Id from join_test;*
>>> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
>>> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>>> 0: jdbc:hive2://>
>>>
>>> Is there a way to merge the value of df1("Id") and df2("Id") into one
>>> "Id"
>>>
>>> Thanks
>>>
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Is there a way to merge parquet small files?

2016-05-20 Thread Takeshi Yamamuro
Many small files could cause technical issues in both hdfs and spark
though, they do not
generate many stages and tasks in the recent version of spark.

// maropu

On Fri, May 20, 2016 at 2:41 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> For logs file I would suggest save as gziped text file first.  After
> aggregation, convert them into parquet by merging a few files.
>
>
>
> On May 19, 2016, at 22:32, Deng Ching-Mallete <och...@apache.org> wrote:
>
> IMO, it might be better to merge or compact the parquet files instead of
> keeping lots of small files in the HDFS. Please refer to [1] for more info.
>
> We also encountered the same issue with the slow query, and it was indeed
> caused by the many small parquet files. In our case, we were processing
> large data sets with batch jobs instead of a streaming job. To solve our
> issue, we just did a coalesce to reduce the number of partitions before
> saving as parquet format.
>
> HTH,
> Deng
>
> [1] http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
>
> On Fri, May 20, 2016 at 1:50 PM, 王晓龙/0515 <roland8...@cmbchina.com>
> wrote:
>
>> I’m using a spark streaming program to store log message into parquet
>> file every 10 mins.
>> Now, when I query the parquet, it usually takes hundreds of thousands of
>> stages to compute a single count.
>> I looked into the parquet file’s path and find a great amount of small
>> files.
>>
>> Do the small files caused the problem? Can I merge them, or is there a
>> better way to solve it?
>>
>> Lots of thanks.
>>
>> 
>>
>> 此邮件内容仅代表发送者的个人观点和意见,与招商银行股份有限公司及其下属分支机构的观点和意见无关,招商银行股份有限公司及其下属分支机构不对此邮件内容承担任何责任。此邮件内容仅限收件人查阅,如误收此邮件请立即删除。
>>
>> -----
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Spark.default.parallelism can not set reduce number

2016-05-20 Thread Takeshi Yamamuro
You need to use `spark.sql.shuffle.partitions`.

// maropu

On Fri, May 20, 2016 at 8:17 PM, 喜之郎 <251922...@qq.com> wrote:

>  Hi all.
> I set Spark.default.parallelism equals 20 in spark-default.conf. And send
> this file to all nodes.
> But I found reduce number is still default value,200.
> Does anyone else encouter this problem? can anyone give some advice?
>
> 
> [Stage 9:>(0 + 0)
> / 200]
> [Stage 9:>(0 + 2)
> / 200]
> [Stage 9:>(1 + 2)
> / 200]
> [Stage 9:>(2 + 2)
> / 200]
> ###
>
> And this results in many empty files.Because my data is little, only some
> of the 200 files have data.
> ###
>  2016-05-20 17:01
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-0
>  2016-05-20 17:01
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-1
>  2016-05-20 17:01
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-2
>  2016-05-20 17:01
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-3
>  2016-05-20 17:01
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-4
>  2016-05-20 17:01
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-5
> 
>
>
>
>


-- 
---
Takeshi Yamamuro


Re: Spark 1.6 Catalyst optimizer

2016-05-12 Thread Takeshi Yamamuro
Hi,

What's the result of `df3.explain(true)`?

// maropu

On Thu, May 12, 2016 at 10:04 AM, Telmo Rodrigues <
telmo.galante.rodrig...@gmail.com> wrote:

> I'm building spark from branch-1.6 source with mvn -DskipTests package and
> I'm running the following code with spark shell.
>
> *val* sqlContext *=* *new* org.apache.spark.sql.*SQLContext*(sc)
>
> *import* *sqlContext.implicits._*
>
>
> *val df = sqlContext.read.json("persons.json")*
>
> *val df2 = sqlContext.read.json("cars.json")*
>
>
> *df.registerTempTable("t")*
>
> *df2.registerTempTable("u")*
>
>
> *val d3 =sqlContext.sql("select * from t join u on t.id <http://t.id> =
> u.id <http://u.id> where t.id <http://t.id> = 1")*
>
> With the log4j root category level WARN, the last printed messages refers
> to the Batch Resolution rules execution.
>
> === Result of Batch Resolution ===
> !'Project [unresolvedalias(*)]  Project [id#0L,id#1L]
> !+- 'Filter ('t.id = 1) +- Filter (id#0L = cast(1 as
> bigint))
> !   +- 'Join Inner, Some(('t.id = 'u.id))  +- Join Inner, Some((id#0L
> = id#1L))
> !  :- 'UnresolvedRelation `t`, None   :- Subquery t
> !  +- 'UnresolvedRelation `u`, None   :  +- Relation[id#0L]
> JSONRelation
> ! +- Subquery u
> !+- Relation[id#1L]
> JSONRelation
>
>
> I think that only the analyser rules are being executed.
>
> The optimiser rules should not to run in this case?
>
> 2016-05-11 19:24 GMT+01:00 Michael Armbrust <mich...@databricks.com>:
>
>>
>>> logical plan after optimizer execution:
>>>
>>> Project [id#0L,id#1L]
>>> !+- Filter (id#0L = cast(1 as bigint))
>>> !   +- Join Inner, Some((id#0L = id#1L))
>>> !  :- Subquery t
>>> !  :  +- Relation[id#0L] JSONRelation
>>> !  +- Subquery u
>>> !  +- Relation[id#1L] JSONRelation
>>>
>>
>> I think you are mistaken.  If this was the optimized plan there would be
>> no subqueries.
>>
>
>


-- 
---
Takeshi Yamamuro


Re: GC overhead limit exceeded

2016-05-16 Thread Takeshi Yamamuro
27 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory
> on localhost:59407 (size: 19.3 KB, free: 51.5 GB)
> 16/05/16 15:22:27 INFO BlockManagerInfo: Added broadcast_15_piece0 in
> memory
> on localhost:59407 (size: 8.2 KB, free: 51.5 GB)
> 16/05/16 15:22:27 INFO BlockManagerInfo: Added broadcast_13_piece0 in
> memory
> on localhost:59407 (size: 19.3 KB, free: 51.5 GB)
> 16/05/16 15:22:27 INFO BlockManagerInfo: Added broadcast_14_piece0 in
> memory
> on localhost:59407 (size: 19.3 KB, free: 51.5 GB)
> 16/05/16 15:22:27 INFO Executor: Told to re-register on heartbeat
> 16/05/16 15:22:27 INFO BlockManager: BlockManager re-registering with
> master
> 16/05/16 15:22:27 INFO BlockManagerMaster: Trying to register BlockManager
> 16/05/16 15:22:27 INFO BlockManagerMaster: Registered BlockManager
> 16/05/16 15:22:27 INFO BlockManager: Reporting 8 blocks to the master.
> 16/05/16 15:22:27 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory
> on localhost:59407 (size: 19.3 KB, free: 51.5 GB)
> 16/05/16 15:22:27 INFO SparkUI: Stopped Spark web UI at
> http://192.168.107.30:4040
> 16/05/16 15:22:27 INFO BlockManagerInfo: Added broadcast_15_piece0 in
> memory
> on localhost:59407 (size: 8.2 KB, free: 51.5 GB)
> 16/05/16 15:22:27 INFO BlockManagerInfo: Added broadcast_13_piece0 in
> memory
> on localhost:59407 (size: 19.3 KB, free: 51.5 GB)
> 16/05/16 15:22:27 INFO BlockManagerInfo: Added broadcast_14_piece0 in
> memory
> on localhost:59407 (size: 19.3 KB, free: 51.5 GB)
> 16/05/16 15:22:56 INFO MapOutputTrackerMasterEndpoint:
> MapOutputTrackerMasterEndpoint stopped!
> 05-16 15:22:56.111 127.0.0.1:54321   2059   #e Thread WARN: Swapping!
> GC CALLBACK, (K/V:29.74 GB + POJO:15.20 GB + FREE:12.56 GB == MEM_MAX:57.50
> GB), desiredKV=8.12 GB OOM!
> 16/05/16 15:22:56 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
> shut down.
> 16/05/16 15:22:56 WARN NettyRpcEndpointRef: Error sending message [message
> =
> Heartbeat(driver,[Lscala.Tuple2;@797268e9,BlockManagerId(driver,
> localhost,
> 59407))] in 1 attempts
> org.apache.spark.SparkException: Could not find HeartbeatReceiver or it has
> been stopped.
> at
> org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:161)
> at
>
> org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:126)
> at
> org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:227)
> at
> org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:511)
> at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:100)
> at
> org.apache.spark.executor.Executor.org
> $apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:449)
> at
>
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:470)
> at
>
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:470)
> at
>
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:470)
> at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
> at
> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:470)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)"
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/GC-overhead-limit-exceeded-tp26966.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: GC overhead limit exceeded

2016-05-16 Thread Takeshi Yamamuro
To understand the issue, you need to describe more about your case;
what's the version of spark you use and what's your job?
Also, what if you directly use scala interfaces instead of python ones?

On Mon, May 16, 2016 at 11:56 PM, Aleksandr Modestov <
aleksandrmodes...@gmail.com> wrote:

> Hi,
>
> "Why did you though you have enough memory for your task? You checked task
> statistics in your WebUI?". I mean that I have jnly about 5Gb data but
> spark.driver memory in 60Gb. I check task statistics in web UI.
> But really spark says that
> *"05-16 17:50:06.254 127.0.0.1:54321 <http://127.0.0.1:54321>   1534
> #e Thread WARN: Swapping!  GC CALLBACK, (K/V:29.74 GB + POJO:18.97 GB +
> FREE:8.79 GB == MEM_MAX:57.50 GB), desiredKV=7.19 GB OOM!Exception in
> thread "Heartbeat" java.lang.OutOfMemoryError: Java heap space"*
> But why spark doesn't split data into a disk?
>
> On Mon, May 16, 2016 at 5:11 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> Why did you though you have enough memory for your task? You checked task
>> statistics in your WebUI?
>> Anyway, If you get stuck with the GC issue, you'd better off increasing
>> the number of partitions.
>>
>> // maropu
>>
>> On Mon, May 16, 2016 at 10:00 PM, AlexModestov <
>> aleksandrmodes...@gmail.com> wrote:
>>
>>> I get the error in the apache spark...
>>>
>>> "spark.driver.memory 60g
>>> spark.python.worker.memory 60g
>>> spark.master local[*]"
>>>
>>> The amount of data is about 5Gb, but spark says that "GC overhead limit
>>> exceeded". I guess that my conf-file gives enought resources.
>>>
>>> "16/05/16 15:13:02 WARN NettyRpcEndpointRef: Error sending message
>>> [message
>>> = Heartbeat(driver,[Lscala.Tuple2;@87576f9,BlockManagerId(driver,
>>> localhost,
>>> 59407))] in 1 attempts
>>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10
>>> seconds]. This timeout is controlled by spark.executor.heartbeatInterval
>>> at
>>> org.apache.spark.rpc.RpcTimeout.org
>>> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
>>> at
>>>
>>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
>>> at
>>>
>>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>>> at
>>>
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
>>> at
>>> org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
>>> at
>>>
>>> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
>>> at
>>> org.apache.spark.executor.Executor.org
>>> $apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:449)
>>> at
>>>
>>> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:470)
>>> at
>>>
>>> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:470)
>>> at
>>>
>>> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:470)
>>> at
>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
>>> at
>>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:470)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at
>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> at
>>>
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> at
>>>
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>>> [10 seconds]
>>> at
>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>> at
>>> scala.concurren

Re: Error joining dataframes

2016-05-18 Thread Takeshi Yamamuro
Look weird, seems spark-v1.5.x can accept the query.
What's the difference between the example and your query?



Welcome to

    __

 / __/__  ___ _/ /__

_\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 1.5.2

  /_/

scala> :paste

// Entering paste mode (ctrl-D to finish)

val df1 = Seq((1, 0), (2, 0)).toDF("id", "A")

val df2 = Seq((2, 0), (3, 0)).toDF("id", "B")

df1.join(df2, df1("id") === df2("id"), "outer").show


// Exiting paste mode, now interpreting.


+++++

|  id|   A|  id|   B|

+++++

|   1|   0|null|null|

|   2|   0|   2|   0|

|null|null|   3|   0|

+++++


df1: org.apache.spark.sql.DataFrame = [id: int, A: int]

df2: org.apache.spark.sql.DataFrame = [id: int, B: int]





On Wed, May 18, 2016 at 3:52 PM, ram kumar <ramkumarro...@gmail.com> wrote:

> I tried
> df1.join(df2, df1("id") === df2("id"), "outer").show
>
> But there is a duplicate "id" and when I query the "id", I get
> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>
> I am currently using spark 1.5.2.
> Is there any alternative way in 1.5
>
> Thanks
>
> On Wed, May 18, 2016 at 12:12 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Also, you can pass the query that you'd like to use in spark-v1.6+;
>>
>> val df1 = Seq((1, 0), (2, 0), (3, 0)).toDF("id", "A")
>> val df2 = Seq((1, 0), (2, 0), (3, 0)).toDF("id", "B")
>> df1.join(df2, df1("id") === df2("id"), "outer").show
>>
>> // maropu
>>
>>
>> On Wed, May 18, 2016 at 3:29 PM, ram kumar <ramkumarro...@gmail.com>
>> wrote:
>>
>>> If I run as
>>> val rs = s.join(t,"time_id").join(c,"channel_id")
>>>
>>> It takes as inner join.
>>>
>>>
>>> On Wed, May 18, 2016 at 2:31 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> pretty simple, a similar construct to tables projected as DF
>>>>
>>>> val c =
>>>> HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
>>>> val t =
>>>> HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
>>>> val rs = s.join(t,"time_id").join(c,"channel_id")
>>>>
>>>> HTH
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 17 May 2016 at 21:52, Bijay Kumar Pathak <bkpat...@mtu.edu> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Try this one:
>>>>>
>>>>>
>>>>> df_join = df1.*join*(df2, 'Id', "fullouter")
>>>>>
>>>>> Thanks,
>>>>> Bijay
>>>>>
>>>>>
>>>>> On Tue, May 17, 2016 at 9:39 AM, ram kumar <ramkumarro...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I tried to join two dataframe
>>>>>>
>>>>>> df_join = df1.*join*(df2, ((df1("Id") === df2("Id")), "fullouter")
>>>>>>
>>>>>> df_join.registerTempTable("join_test")
>>>>>>
>>>>>>
>>>>>> When querying "Id" from "join_test"
>>>>>>
>>>>>> 0: jdbc:hive2://> *select Id from join_test;*
>>>>>> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
>>>>>> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>>>>>> 0: jdbc:hive2://>
>>>>>>
>>>>>> Is there a way to merge the value of df1("Id") and df2("Id") into one
>>>>>> "Id"
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Spark handling spill overs

2016-05-12 Thread Takeshi Yamamuro
Hi,

Which version of Spark you use?
The recent one cannot handle this kind of spilling, see:
http://spark.apache.org/docs/latest/tuning.html#memory-management-overview.

// maropu

On Fri, May 13, 2016 at 8:07 AM, Ashok Kumar <ashok34...@yahoo.com.invalid>
wrote:

> Hi,
>
> How one can avoid having Spark spill over after filling the node's memory.
>
> Thanks
>
>
>
>


-- 
---
Takeshi Yamamuro


Re: YARN vs Standalone Spark Usage in production

2016-04-14 Thread Takeshi Yamamuro
Hi,

How about checking Spark survey result 2015 in
https://databricks.com/blog/2015/09/24/spark-survey-results-2015-are-now-available.html
for the statistics?

// maropu

On Fri, Apr 15, 2016 at 4:52 AM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> That's also available in standalone.
>
> On Thu, Apr 14, 2016 at 12:47 PM, Alexander Pivovarov <
> apivova...@gmail.com> wrote:
>
>> Spark on Yarn supports dynamic resource allocation
>>
>> So, you can run several spark-shells / spark-submits / spark-jobserver /
>> zeppelin on one cluster without defining upfront how many executors /
>> memory you want to allocate to each app
>>
>> Great feature for regular users who just want to run Spark / Spark SQL
>>
>>
>> On Thu, Apr 14, 2016 at 12:05 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> I don't think usage is the differentiating factor. YARN and standalone
>>> are pretty well supported. If you are only running a Spark cluster by
>>> itself with nothing else, standalone is probably simpler than setting
>>> up YARN just for Spark. However if you're running on a cluster that
>>> will host other applications, you'll need to integrate with a shared
>>> resource manager and its security model, and for anything
>>> Hadoop-related that's YARN. Standalone wouldn't make as much sense.
>>>
>>> On Thu, Apr 14, 2016 at 6:46 PM, Alexander Pivovarov
>>> <apivova...@gmail.com> wrote:
>>> > AWS EMR includes Spark on Yarn
>>> > Hortonworks and Cloudera platforms include Spark on Yarn as well
>>> >
>>> >
>>> > On Thu, Apr 14, 2016 at 7:29 AM, Arkadiusz Bicz <
>>> arkadiusz.b...@gmail.com>
>>> > wrote:
>>> >>
>>> >> Hello,
>>> >>
>>> >> Is there any statistics regarding YARN vs Standalone Spark Usage in
>>> >> production ?
>>> >>
>>> >> I would like to choose most supported and used technology in
>>> >> production for our project.
>>> >>
>>> >>
>>> >> BR,
>>> >>
>>> >> Arkadiusz Bicz
>>> >>
>>> >> -
>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> >> For additional commands, e-mail: user-h...@spark.apache.org
>>> >>
>>> >
>>>
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Strange bug: Filter problem with parenthesis

2016-04-14 Thread Takeshi Yamamuro
Hi,

Seems you cannot use reserved words (e.g., sum and avg) in the Spark SQL
parser because an input string in filter is processed by the parser inside.

// maropu

On Thu, Apr 14, 2016 at 11:14 PM, <saif.a.ell...@wellsfargo.com> wrote:

> Appreciated Michael, but this doesn’t help my case, the filter string is
> being submitted from outside my program, is there any other alternative?
> some literal string parser or anything I can do before?
>
>
>
> Saif
>
>
>
> *From:* Michael Armbrust [mailto:mich...@databricks.com]
> *Sent:* Wednesday, April 13, 2016 6:29 PM
> *To:* Ellafi, Saif A.
> *Cc:* user
> *Subject:* Re: Strange bug: Filter problem with parenthesis
>
>
>
> You need to use `backticks` to reference columns that have non-standard
> characters.
>
>
>
> On Wed, Apr 13, 2016 at 6:56 AM, <saif.a.ell...@wellsfargo.com> wrote:
>
> Hi,
>
>
>
> I am debugging a program, and for some reason, a line calling the
> following is failing:
>
>
>
> df.filter("sum(OpenAccounts) > 5").show
>
>
>
> It says it cannot find the column *OpenAccounts*, as if it was applying
> the sum() function and looking for a column called like that, where there
> is not. This works fine if I rename the column to something without
> parenthesis.
>
>
>
> I can’t reproduce this issue in Spark Shell (1.6.0), any ideas on how can
> I analyze this? This is an aggregation result, with the default column
> names afterwards.
>
>
>
> PS: Workaround is to use toDF(cols) and rename all columns, but I am
> wondering if toDF has any impact on the RDD structure behind (e.g.
> repartitioning, cache, etc)
>
>
>
> Appreciated,
>
> Saif
>
>
>
>
>



-- 
---
Takeshi Yamamuro


Re: When did Spark started supporting ORC and Parquet?

2016-04-14 Thread Takeshi Yamamuro
Hi,

See SPARK-2883 <https://issues.apache.org/jira/browse/SPARK-2883> for ORC
supports.

// maropu

On Fri, Apr 15, 2016 at 11:22 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> For Parquet, please take a look at SPARK-1251
>
> For ORC, not sure.
> Looking at git history, I found ORC mentioned by SPARK-1368
>
> FYI
>
> On Thu, Apr 14, 2016 at 6:53 PM, Edmon Begoli <ebeg...@gmail.com> wrote:
>
>> I am needing this fact for the research paper I am writing right now.
>>
>> When did Spark start supporting Parquet and when ORC?
>> (what release)
>>
>> I appreciate any info you can offer.
>>
>> Thank you,
>> Edmon
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Memory needs when using expensive operations like groupBy

2016-04-14 Thread Takeshi Yamamuro
Hi,

You should not directly use these JVM options, and
you can use `spark.executor.memory` and `spark.driver.memory` for the
optimization.

// maropu

On Thu, Apr 14, 2016 at 11:32 AM, Divya Gehlot <divya.htco...@gmail.com>
wrote:

> Hi,
> I am using Spark 1.5.2 with Scala 2.10 and my Spark job keeps failing with
> exit code 143 .
> except one job where I am using unionAll and groupBy operation on multiple
> columns .
>
> Please advice me the options to optimize it .
> The one option which I am using it now
> --conf spark.executor.extraJavaOptions  -XX:MaxPermSize=1024m
> -XX:PermSize=256m --conf spark.driver.extraJavaOptions
>  -XX:MaxPermSize=1024m -XX:PermSize=256m --conf
> spark.yarn.executor.memoryOverhead=1024
>
> Need to know the best practices/better ways to optimize code.
>
> Thanks,
> Divya
>
>


-- 
---
Takeshi Yamamuro


Re: Spark sql not pushing down timestamp range queries

2016-04-14 Thread Takeshi Yamamuro
Hi, Mich

Did you check the URL Josh referred to?;
the cast for string comparisons is needed for accepting `c_date >= "2016"`.

// maropu


On Fri, Apr 15, 2016 at 10:30 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote:

> Hi,
>
>
> String comparison itself is pushed down fine but the problem is to deal
> with Cast.
>
>
> It was pushed down before but is was reverted, (
> https://github.com/apache/spark/pull/8049).
>
> Several fixes were tried here, https://github.com/apache/spark/pull/11005
> and etc. but there were no changes to make it.
>
>
> To cut it short, it is not being pushed down because it is unsafe to
> resolve cast (eg. long to integer)
>
> For an workaround,  the implementation of Solr data source should be
> changed to one with CatalystScan, which take all the filters.
>
> But CatalystScan is not designed to be binary compatible across releases,
> however it looks some think it is stable now, as mentioned here,
> https://github.com/apache/spark/pull/10750#issuecomment-175400704.
>
>
> Thanks!
>
>
> 2016-04-15 3:30 GMT+09:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
>
>> Hi Josh,
>>
>> Can you please clarify whether date comparisons as two strings work at
>> all?
>>
>> I was under the impression is that with string comparison only first
>> characters are compared?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 14 April 2016 at 19:26, Josh Rosen <joshro...@databricks.com> wrote:
>>
>>> AFAIK this is not being pushed down because it involves an implicit cast
>>> and we currently don't push casts into data sources or scans; see
>>> https://github.com/databricks/spark-redshift/issues/155 for a
>>> possibly-related discussion.
>>>
>>> On Thu, Apr 14, 2016 at 10:27 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Are you comparing strings in here or timestamp?
>>>>
>>>> Filter ((cast(registration#37 as string) >= 2015-05-28) &&
>>>> (cast(registration#37 as string) <= 2015-05-29))
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 14 April 2016 at 18:04, Kiran Chitturi <
>>>> kiran.chitt...@lucidworks.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Timestamp range filter queries in SQL are not getting pushed down to
>>>>> the PrunedFilteredScan instances. The filtering is happening at the Spark
>>>>> layer.
>>>>>
>>>>> The physical plan for timestamp range queries is not showing the
>>>>> pushed filters where as range queries on other types is working fine as 
>>>>> the
>>>>> physical plan is showing the pushed filters.
>>>>>
>>>>> Please see below for code and examples.
>>>>>
>>>>> *Example:*
>>>>>
>>>>> *1.* Range filter queries on Timestamp types
>>>>>
>>>>>*code: *
>>>>>
>>>>>> sqlContext.sql("SELECT * from events WHERE `registration` >=
>>>>>> '2015-05-28' AND `registration` <= '2015-05-29' ")
>>>>>
>>>>>*Full example*:
>>>>> https://github.com/lucidworks/spark-solr/blob/master/src/test/scala/com/lucidworks/spark/EventsimTestSuite.scala#L151
>>>>> *plan*:
>>>>> https://gist.github.com/kiranchitturi/4a52688c9f0abe3d4b2bd8b938044421#file-time-range-sql
>>>>>
>>>>> *2. * Range filter queries on Long types
>>>>>
>>>>> *code*:
>>>>>
>>>>>> sqlContext.sql("SELECT * from events WHERE `length` >= '700' and
>>>>>> `length` <= '1000'")
>>>>>
>>>>> *Full example*:
>>>>> https://github.com/lucidworks/spark-solr/blob/master/src/test/scala/com/lucidworks/spark/EventsimTestSuite.scala#L151
>>>>> *plan*:
>>>>> https://gist.github.com/kiranchitturi/4a52688c9f0abe3d4b2bd8b938044421#file-length-range-sql
>>>>>
>>>>> The SolrRelation class we use extends
>>>>> <https://github.com/lucidworks/spark-solr/blob/master/src/main/scala/com/lucidworks/spark/SolrRelation.scala#L37>
>>>>> the PrunedFilteredScan.
>>>>>
>>>>> Since Solr supports date ranges, I would like for the timestamp
>>>>> filters to be pushed down to the Solr query.
>>>>>
>>>>> Are there limitations on the type of filters that are passed down with
>>>>> Timestamp types ?
>>>>> Is there something that I should do in my code to fix this ?
>>>>>
>>>>> Thanks,
>>>>> --
>>>>> Kiran Chitturi
>>>>>
>>>>>
>>>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Spark 1.6.1 DataFrame write to JDBC

2016-04-20 Thread Takeshi Yamamuro
Sorry to wrongly send message in mid.
How about trying to increate 'batchsize` in a jdbc option to improve
performance?

// maropu

On Thu, Apr 21, 2016 at 2:15 PM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> How about trying to increate 'batchsize
>
> On Wed, Apr 20, 2016 at 7:14 AM, Jonathan Gray <jonny.g...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm trying to write ~60 million rows from a DataFrame to a database using
>> JDBC using Spark 1.6.1, something similar to df.write().jdbc(...)
>>
>> The write seems to not be performing well.  Profiling the application
>> with a master of local[*] it appears there is not much socket write
>> activity and also not much CPU.
>>
>> I would expect there to be an almost continuous block of socket write
>> activity showing up somewhere in the profile.
>>
>> I can see that the top hot method involves
>> apache.spark.unsafe.platform.CopyMemory all from calls within
>> JdbcUtils.savePartition(...).  However, the CPU doesn't seem particularly
>> stressed so I'm guessing this isn't the cause of the problem.
>>
>> Is there any best practices or has anyone come across a case like this
>> before where a write to a database seems to perform poorly?
>>
>> Thanks,
>> Jon
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
---
Takeshi Yamamuro


Re: DataFrame cannot find temporary table

2016-05-09 Thread Takeshi Yamamuro
Hi,

What's `convertRDDToDF`?
Seems you use different `SQLContext` between table registration and
querying.

//maropu


On Tue, May 10, 2016 at 2:46 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Have you created sqlContext based on HiveContext?
>
>
>   val sc = new SparkContext(conf)
>   // Create sqlContext based on HiveContext
>   val sqlContext = new HiveContext(sc)
>   import sqlContext.implicits._
>
> df.registerTempTable("person")
> ...
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 9 May 2016 at 18:33, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have created dataframe with below code and I was able to print the
>> schema but unfortuntely cannot pull the data from the temporary table. It
>> always says that table is not found
>>
>> val df=convertRDDToDF(records, mapper, errorRecords, sparkContext);
>>
>> import sqlContext._
>> df.printSchema()
>> df.registerTempTable("person")
>> val personRecords = sqlContext.sql("select * from person")
>> personRecords.foreach { println }
>>
>> Schema Output:
>> root
>>  |-- address: struct (nullable = true)
>>  ||-- city: string (nullable = true)
>>  ||-- line1: string (nullable = true)
>>  ||-- state: string (nullable = true)
>>  ||-- zip: string (nullable = true)
>>  |-- first: string (nullable = true)
>>  |-- last: string (nullable = true)
>>
>> *Error while accessing table:*
>> Exception in thread "main" org.apache.spark.sql.AnalysisException: Table
>> not found: person;
>>
>> Does anyone have solution for this?
>>
>> Thanks,
>> Asmath
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Cartesian join on RDDs taking too much time

2016-05-25 Thread Takeshi Yamamuro
Hi,

Seems you'd be better off using DataFrame#join instead of  RDD.cartesian
because it always needs shuffle operations which have alot of overheads
such as reflection, serialization, ...
In your case,  since the smaller table is 7mb, DataFrame#join uses a
broadcast strategy.
This is a little more efficient than  RDD.cartesian.

// maropu

On Wed, May 25, 2016 at 4:20 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> It is basically a Cartesian join like RDBMS
>
> Example:
>
> SELECT * FROM FinancialCodes,  FinancialData
>
> The results of this query matches every row in the FinancialCodes table
> with every row in the FinancialData table.  Each row consists of all
> columns from the FinancialCodes table followed by all columns from the
> FinancialData table.
>
>
> Not very useful
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 25 May 2016 at 08:05, Priya Ch <learnings.chitt...@gmail.com> wrote:
>
>> Hi All,
>>
>>   I have two RDDs A and B where in A is of size 30 MB and B is of size 7
>> MB, A.cartesian(B) is taking too much time. Is there any bottleneck in
>> cartesian operation ?
>>
>> I am using spark 1.6.0 version
>>
>> Regards,
>> Padma Ch
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Cartesian join on RDDs taking too much time

2016-05-25 Thread Takeshi Yamamuro
Why did you use Rdd#saveAsTextFile instead of DataFrame#save writing as
parquet, orc, ...?

// maropu

On Wed, May 25, 2016 at 7:10 PM, Priya Ch <learnings.chitt...@gmail.com>
wrote:

> Hi , Yes I have joined using DataFrame join. Now to save this into hdfs .I
> am converting the joined dataframe to rdd (dataframe.rdd) and using
> saveAsTextFile, trying to save it. However, this is also taking too much
> time.
>
> Thanks,
> Padma Ch
>
> On Wed, May 25, 2016 at 1:32 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> Seems you'd be better off using DataFrame#join instead of  RDD.cartesian
>> because it always needs shuffle operations which have alot of overheads
>> such as reflection, serialization, ...
>> In your case,  since the smaller table is 7mb, DataFrame#join uses a
>> broadcast strategy.
>> This is a little more efficient than  RDD.cartesian.
>>
>> // maropu
>>
>> On Wed, May 25, 2016 at 4:20 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> It is basically a Cartesian join like RDBMS
>>>
>>> Example:
>>>
>>> SELECT * FROM FinancialCodes,  FinancialData
>>>
>>> The results of this query matches every row in the FinancialCodes table
>>> with every row in the FinancialData table.  Each row consists of all
>>> columns from the FinancialCodes table followed by all columns from the
>>> FinancialData table.
>>>
>>>
>>> Not very useful
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 25 May 2016 at 08:05, Priya Ch <learnings.chitt...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>>   I have two RDDs A and B where in A is of size 30 MB and B is of size
>>>> 7 MB, A.cartesian(B) is taking too much time. Is there any bottleneck in
>>>> cartesian operation ?
>>>>
>>>> I am using spark 1.6.0 version
>>>>
>>>> Regards,
>>>> Padma Ch
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Facing issues while reading parquet file in spark 1.2.1

2016-05-25 Thread Takeshi Yamamuro
Hi,

You need to describe more to make others easily understood;
what's the version of spark and what's the query you use?

// maropu


On Wed, May 25, 2016 at 8:27 PM, vaibhav srivastava <vaibhavcs...@gmail.com>
wrote:

> Hi All,
>
>  I am facing below stack traces while reading data from parquet file
>
> Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 7
>
> at parquet.bytes.BytesUtils.bytesToLong(BytesUtils.java:247)
>
> at
> parquet.column.statistics.LongStatistics.setMinMaxFromBytes(LongStatistics.java:47)
>
> at
> parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249)
>
> at
> parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:543)
>
> at
> parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:520)
>
> at
> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:426)
>
> at
> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:389)
>
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$readMetaData$3.apply(ParquetTypes.scala:457)
>
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$readMetaData$3.apply(ParquetTypes.scala:457)
>
> at scala.Option.map(Option.scala:145)
>
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetTypes.scala:457)
>
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$.readSchemaFromFile(ParquetTypes.scala:477)
>
> at
> org.apache.spark.sql.parquet.ParquetRelation.(ParquetRelation.scala:65)
>
> at
> org.apache.spark.sql.SQLContext.parquetFile(SQLContext.scala:165)
>
> Please suggest. It seems like it not able to convert some data
>



-- 
---
Takeshi Yamamuro


  1   2   3   >