Re: Spark SQL create table

2016-01-18 Thread Ted Yu
Please take a look
at sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala

On Mon, Jan 18, 2016 at 9:57 AM, raghukiran  wrote:

> Is creating a table using the SparkSQLContext currently supported?
>
> Regards,
> Raghu
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-create-table-tp25996.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark SQL create table

2016-01-18 Thread Ted Yu
By SparkSQLContext, I assume you mean SQLContext.
>From the doc for SQLContext#createDataFrame():

   *  dataFrame.registerTempTable("people")
   *  sqlContext.sql("select name from people").collect.foreach(println)

If you want to persist table externally, you need Hive, etc

Regards

On Mon, Jan 18, 2016 at 10:28 AM, Raghu Ganti <raghuki...@gmail.com> wrote:

> This requires Hive to be installed and uses HiveContext, right?
>
> What is the SparkSQLContext useful for?
>
> On Mon, Jan 18, 2016 at 1:27 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Please take a look
>> at sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
>>
>> On Mon, Jan 18, 2016 at 9:57 AM, raghukiran <raghuki...@gmail.com> wrote:
>>
>>> Is creating a table using the SparkSQLContext currently supported?
>>>
>>> Regards,
>>> Raghu
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-create-table-tp25996.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: rdd.foreach return value

2016-01-18 Thread Ted Yu
Here is signature for foreach:
 def foreach(f: T => Unit): Unit = withScope {

I don't think you can return element in the way shown in the snippet.

On Mon, Jan 18, 2016 at 7:34 PM, charles li  wrote:

> code snippet
>
>
> ​
> the 'print' actually print info on the worker node, but I feel confused
> where the 'return' value
> goes to. for I get nothing on the driver node.
> --
> *--*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>


Re: rdd.foreach return value

2016-01-18 Thread Ted Yu
For #2, RDD is immutable. 

> On Jan 18, 2016, at 8:10 PM, charles li <charles.up...@gmail.com> wrote:
> 
> 
> hi, great thanks to david and ted, I know that the content of RDD can be 
> returned to driver using 'collect' method.
> 
> but my question is:
> 
> 
> 1. cause we can write any code we like in the function put into 'foreach', so 
> what happened when we actually write a 'return' sentence in the foreach 
> function?
> 2. as the photo shows bellow, the content of RDD doesn't change after foreach 
> function, why?
> 3. I feel a little confused about the 'foreach' method, it should be an 
> 'action', right? cause it return nothing. or is there any best practice of 
> the 'foreach' funtion? or can some one put your code snippet when using 
> 'foreach' method in your application, that would be awesome. 
> 
> 
> great thanks again
> 
> 
> 
> ​
> 
>> On Tue, Jan 19, 2016 at 11:44 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>> Here is signature for foreach:
>>  def foreach(f: T => Unit): Unit = withScope {
>> 
>> I don't think you can return element in the way shown in the snippet.
>> 
>>> On Mon, Jan 18, 2016 at 7:34 PM, charles li <charles.up...@gmail.com> wrote:
>>> code snippet
>>> 
>>> <屏幕快照 2016-01-19 上午11.32.05.png>
>>> ​
>>> the 'print' actually print info on the worker node, but I feel confused 
>>> where the 'return' value 
>>> goes to. for I get nothing on the driver node.
>>> -- 
>>> --
>>> a spark lover, a quant, a developer and a good man.
>>> 
>>> http://github.com/litaotao
> 
> 
> 
> -- 
> --
> a spark lover, a quant, a developer and a good man.
> 
> http://github.com/litaotao


Re: using spark context in map funciton TASk not serilizable error

2016-01-18 Thread Ted Yu
Can you pass the properties which are needed for accessing Cassandra
without going through SparkContext ?

SparkContext isn't designed to be used in the way illustrated below.

Cheers

On Mon, Jan 18, 2016 at 12:29 PM, gpatcham  wrote:

> Hi,
>
> I have a use case where I need to pass sparkcontext in map function
>
> reRDD.map(row =>method1(row,sc)).saveAsTextFile(outputDir)
>
> Method1 needs spark context to query cassandra. But I see below error
>
> java.io.NotSerializableException: org.apache.spark.SparkContext
>
> Is there a way we can fix this ?
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/using-spark-context-in-map-funciton-TASk-not-serilizable-error-tp25998.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: building spark 1.6 throws error Rscript: command not found

2016-01-18 Thread Ted Yu
Please see:
http://www.jason-french.com/blog/2013/03/11/installing-r-in-linux/

On Mon, Jan 18, 2016 at 1:22 PM, Mich Talebzadeh 
wrote:

> ./make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.6
> -Phive -Phive-thriftserver -Pyarn
>
>
>
>
>
> INFO] --- exec-maven-plugin:1.4.0:exec (sparkr-pkg) @ spark-core_2.10 ---
>
> *../R/install-dev.sh: line 40: Rscript: command not found*
>
> [INFO]
> 
>
> [INFO] Reactor Summary:
>
> [INFO]
>
> [INFO] Spark Project Parent POM ... SUCCESS [
> 2.921 s]
>
> [INFO] Spark Project Test Tags  SUCCESS [
> 2.921 s]
>
> [INFO] Spark Project Launcher . SUCCESS [
> 17.252 s]
>
> [INFO] Spark Project Networking ... SUCCESS [
> 9.237 s]
>
> [INFO] Spark Project Shuffle Streaming Service  SUCCESS [
> 4.969 s]
>
> [INFO] Spark Project Unsafe ... SUCCESS [
> 13.384 s]
>
> [INFO] Spark Project Core . FAILURE [01:34
> min]
>
>
>
>
>
> How can I resolve this by any chance?
>
>
>
>
>
> Thanks
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> *Sybase ASE 15 Gold Medal Award 2008*
>
> A Winning Strategy: Running the most Critical Financial Data on ASE 15
>
>
> http://login.sybase.com/files/Product_Overviews/ASE-Winning-Strategy-091908.pdf
>
> Author of the books* "A Practitioner’s Guide to Upgrading to Sybase ASE
> 15", ISBN 978-0-9563693-0-7*.
>
> co-author *"Sybase Transact SQL Guidelines Best Practices", ISBN
> 978-0-9759693-0-4*
>
> *Publications due shortly:*
>
> *Complex Event Processing in Heterogeneous Environments*, ISBN:
> 978-0-9563693-3-8
>
> *Oracle and Sybase, Concepts and Contrasts*, ISBN: 978-0-9563693-1-4, volume
> one out shortly
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>


Re: using spark context in map funciton TASk not serilizable error

2016-01-18 Thread Ted Yu
Did you mean constructing SparkContext on the worker nodes ?

Not sure whether that would work.

Doesn't seem to be good practice.

On Mon, Jan 18, 2016 at 1:27 PM, Giri P <gpatc...@gmail.com> wrote:

> Can we use @transient ?
>
>
> On Mon, Jan 18, 2016 at 12:44 PM, Giri P <gpatc...@gmail.com> wrote:
>
>> I'm using spark cassandra connector to do this and the way we access
>> cassandra table is
>>
>> sc.cassandraTable("keySpace", "tableName")
>>
>> Thanks
>> Giri
>>
>> On Mon, Jan 18, 2016 at 12:37 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Can you pass the properties which are needed for accessing Cassandra
>>> without going through SparkContext ?
>>>
>>> SparkContext isn't designed to be used in the way illustrated below.
>>>
>>> Cheers
>>>
>>> On Mon, Jan 18, 2016 at 12:29 PM, gpatcham <gpatc...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a use case where I need to pass sparkcontext in map function
>>>>
>>>> reRDD.map(row =>method1(row,sc)).saveAsTextFile(outputDir)
>>>>
>>>> Method1 needs spark context to query cassandra. But I see below error
>>>>
>>>> java.io.NotSerializableException: org.apache.spark.SparkContext
>>>>
>>>> Is there a way we can fix this ?
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/using-spark-context-in-map-funciton-TASk-not-serilizable-error-tp25998.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: using spark context in map funciton TASk not serilizable error

2016-01-18 Thread Ted Yu
class SQLContext private[sql](
@transient val sparkContext: SparkContext,
@transient protected[sql] val cacheManager: CacheManager,
@transient private[sql] val listener: SQLListener,
val isRootContext: Boolean)
  extends org.apache.spark.Logging with Serializable {

FYI

On Mon, Jan 18, 2016 at 1:44 PM, Giri P <gpatc...@gmail.com> wrote:

> yes I tried doing that but that doesn't work.
>
> I'm looking at using SQLContext and dataframes. Is SQLCOntext serializable?
>
> On Mon, Jan 18, 2016 at 1:29 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Did you mean constructing SparkContext on the worker nodes ?
>>
>> Not sure whether that would work.
>>
>> Doesn't seem to be good practice.
>>
>> On Mon, Jan 18, 2016 at 1:27 PM, Giri P <gpatc...@gmail.com> wrote:
>>
>>> Can we use @transient ?
>>>
>>>
>>> On Mon, Jan 18, 2016 at 12:44 PM, Giri P <gpatc...@gmail.com> wrote:
>>>
>>>> I'm using spark cassandra connector to do this and the way we access
>>>> cassandra table is
>>>>
>>>> sc.cassandraTable("keySpace", "tableName")
>>>>
>>>> Thanks
>>>> Giri
>>>>
>>>> On Mon, Jan 18, 2016 at 12:37 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Can you pass the properties which are needed for accessing Cassandra
>>>>> without going through SparkContext ?
>>>>>
>>>>> SparkContext isn't designed to be used in the way illustrated below.
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Mon, Jan 18, 2016 at 12:29 PM, gpatcham <gpatc...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have a use case where I need to pass sparkcontext in map function
>>>>>>
>>>>>> reRDD.map(row =>method1(row,sc)).saveAsTextFile(outputDir)
>>>>>>
>>>>>> Method1 needs spark context to query cassandra. But I see below error
>>>>>>
>>>>>> java.io.NotSerializableException: org.apache.spark.SparkContext
>>>>>>
>>>>>> Is there a way we can fix this ?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/using-spark-context-in-map-funciton-TASk-not-serilizable-error-tp25998.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: SQL UDF problem (with re to types)

2016-01-17 Thread Ted Yu
While reading some book on Java 8, I saw a reference to the following
w.r.t. declaration-site variance :

https://bugs.openjdk.java.net/browse/JDK-8043488

The above reportedly targets Java 9.

FYI

On Thu, Jan 14, 2016 at 12:33 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> I don't believe that Java 8 got rid of erasure. In fact I think its
> actually worse when you use Java 8 lambdas.
>
> On Thu, Jan 14, 2016 at 10:54 AM, Raghu Ganti <raghuki...@gmail.com>
> wrote:
>
>> Would this go away if the Spark source was compiled against Java 1.8
>> (since the problem of type erasure is solved through proper generics
>> implementation in Java 1.8).
>>
>> On Thu, Jan 14, 2016 at 1:42 PM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> We automatically convert types for UDFs defined in Scala, but we can't
>>> do it in Java because the types are erased by the compiler.  If you want to
>>> use double you should cast before calling the UDF.
>>>
>>> On Wed, Jan 13, 2016 at 8:10 PM, Raghu Ganti <raghuki...@gmail.com>
>>> wrote:
>>>
>>>> So, when I try BigDecimal, it works. But, should it not parse based on
>>>> what the UDF defines? Am I missing something here?
>>>>
>>>> On Wed, Jan 13, 2016 at 4:57 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Please take a look
>>>>> at 
>>>>> sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java
>>>>> which shows a UserDefinedAggregateFunction that works on DoubleType 
>>>>> column.
>>>>>
>>>>> sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
>>>>> shows how it is registered.
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Wed, Jan 13, 2016 at 11:58 AM, raghukiran <raghuki...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> While registering and using SQL UDFs, I am running into the following
>>>>>> problem:
>>>>>>
>>>>>> UDF registered:
>>>>>>
>>>>>> ctx.udf().register("Test", new UDF1<Double, String>() {
>>>>>> /**
>>>>>>  *
>>>>>>  */
>>>>>> private static final long serialVersionUID =
>>>>>> -8231917155671435931L;
>>>>>>
>>>>>> public String call(Double x) throws Exception
>>>>>> {
>>>>>> return "testing";
>>>>>> }
>>>>>> }, DataTypes.StringType);
>>>>>>
>>>>>> Usage:
>>>>>> query = "SELECT Test(82.4)";
>>>>>> result = sqlCtx.sql(query).first();
>>>>>> System.out.println(result.toString());
>>>>>>
>>>>>> Problem: Class Cast exception thrown
>>>>>> Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot
>>>>>> be cast
>>>>>> to java.lang.Double
>>>>>>
>>>>>> This problem occurs with Spark v1.5.2 and 1.6.0.
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/SQL-UDF-problem-with-re-to-types-tp25968.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to tunning my spark application.

2016-01-17 Thread Ted Yu
In sampleArray(), there is a loop:
for (i <- 0 until ARRAY_SAMPLE_SIZE) {

ARRAY_SAMPLE_SIZE is a constant (100).

Not clear how the amount of computation in sampleArray() can be reduced.

Which Spark release are you using ?

Thanks

On Sun, Jan 17, 2016 at 6:22 AM, 张峻  wrote:

> Dear All
>
> I used jProfiler to profiling my spark application.
> And I had find more than 70% cpu is used by the
> org.apache.spark.util.SizeEstimator class.
>
> There call tree is as blow.
>
> java.lang.Thread.run
> --scala.collection.immutable.Range.foreach$mVc$sp
>
> org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp
> --scala.collection.immutable.List.foreach
>
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply
> --scala.collection.immutable.List.foreach
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply
>
> My code don’t show in this two biggest branch of the call tree.
>
> I want to know what will cause spark to spend so many time in
> “Range.foreach” or “.List.foreach”
> Any one can give me some tips?
>
> BR
>
> Julian Zhang
>
>
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to tunning my spark application.

2016-01-17 Thread Ted Yu
For 'List.foreach', it is likely for the pointerFields shown below:

  private class ClassInfo(
val shellSize: Long,
val pointerFields: List[Field]) {}

FYI

On Sun, Jan 17, 2016 at 7:15 AM, 张峻 <julian_do...@me.com> wrote:

> Dear Ted
>
> My Spark release is 1.5.2
>
> BR
>
> Julian Zhang
>
> 在 2016年1月17日,23:10,Ted Yu <yuzhih...@gmail.com> 写道:
>
> In sampleArray(), there is a loop:
> for (i <- 0 until ARRAY_SAMPLE_SIZE) {
>
> ARRAY_SAMPLE_SIZE is a constant (100).
>
> Not clear how the amount of computation in sampleArray() can be reduced.
>
> Which Spark release are you using ?
>
> Thanks
>
> On Sun, Jan 17, 2016 at 6:22 AM, 张峻 <julian_do...@me.com> wrote:
>
>> Dear All
>>
>> I used jProfiler to profiling my spark application.
>> And I had find more than 70% cpu is used by the
>> org.apache.spark.util.SizeEstimator class.
>>
>> There call tree is as blow.
>>
>> java.lang.Thread.run
>> --scala.collection.immutable.Range.foreach$mVc$sp
>>
>> org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp
>> --scala.collection.immutable.List.foreach
>>
>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply
>> --scala.collection.immutable.List.foreach
>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply
>>
>> My code don’t show in this two biggest branch of the call tree.
>>
>> I want to know what will cause spark to spend so many time in
>> “Range.foreach” or “.List.foreach”
>> Any one can give me some tips?
>>
>> BR
>>
>> Julian Zhang
>>
>>
>>
>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Sending large objects to specific RDDs

2016-01-16 Thread Ted Yu
Both groupByKey and join() accept Partitioner as parameter.

Maybe you can specify a custom Partitioner so that the amount of shuffle is
reduced.

On Sat, Jan 16, 2016 at 9:39 AM, Daniel Imberman <daniel.imber...@gmail.com>
wrote:

> Hi Ted,
>
> I think I might have figured something out!(Though I haven't tested it at
> scale yet)
>
> My current thought is that I can do a groupByKey on the RDD of vectors and
> then do a join with the invertedIndex.
> It would look something like this:
>
> val InvIndexes:RDD[(Int,InvertedIndex)]
> val partitionedVectors:RDD[(Int, Vector)]
>
> val partitionedTasks:RDD[(Int, (Iterator[Vector], InvertedIndex))] =
> partitionedvectors.groupByKey().join(invIndexes)
>
> val similarities = partitionedTasks.map(//calculate similarities)
> val maxSim = similarities.reduce(math.max)
>
>
> So while I realize that usually a groupByKey is usually frowned upon, it
> seems to me that since I need all associated vectors to be local anyways
> that this repartitioning would not be too expensive.
>
> Does this seem like a reasonable approach to this problem or are there any
> faults that I should consider should I approach it this way?
>
> Thank you for your help,
>
> Daniel
>
> On Fri, Jan 15, 2016 at 5:30 PM Ted Yu <yuzhih...@gmail.com> wrote:
>
>> My knowledge of XSEDE is limited - I visited the website.
>>
>> If there is no easy way to deploy HBase, alternative approach (using hdfs
>> ?) needs to be considered.
>>
>> I need to do more homework on this :-)
>>
>> On Thu, Jan 14, 2016 at 3:51 PM, Daniel Imberman <
>> daniel.imber...@gmail.com> wrote:
>>
>>> Hi Ted,
>>>
>>> So unfortunately after looking into the cluster manager that I will be
>>> using for my testing (I'm using a super-computer called XSEDE rather than
>>> AWS), it looks like the cluster does not actually come with Hbase installed
>>> (this cluster is becoming somewhat problematic, as it is essentially AWS
>>> but you have to do your own virtualization scripts). Do you have any other
>>> thoughts on how I could go about dealing with this purely using spark and
>>> HDFS?
>>>
>>> Thank you
>>>
>>> On Wed, Jan 13, 2016 at 11:49 AM Daniel Imberman <
>>> daniel.imber...@gmail.com> wrote:
>>>
>>>> Thank you Ted! That sounds like it would probably be the most efficient
>>>> (with the least overhead) way of handling this situation.
>>>>
>>>> On Wed, Jan 13, 2016 at 11:36 AM Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Another approach is to store the objects in NoSQL store such as HBase.
>>>>>
>>>>> Looking up object should be very fast.
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Wed, Jan 13, 2016 at 11:29 AM, Daniel Imberman <
>>>>> daniel.imber...@gmail.com> wrote:
>>>>>
>>>>>> I'm looking for a way to send structures to pre-determined partitions
>>>>>> so that
>>>>>> they can be used by another RDD in a mapPartition.
>>>>>>
>>>>>> Essentially I'm given and RDD of SparseVectors and an RDD of inverted
>>>>>> indexes. The inverted index objects are quite large.
>>>>>>
>>>>>> My hope is to do a MapPartitions within the RDD of vectors where I can
>>>>>> compare each vector to the inverted index. The issue is that I only
>>>>>> NEED one
>>>>>> inverted index object per partition (which would have the same key as
>>>>>> the
>>>>>> values within that partition).
>>>>>>
>>>>>>
>>>>>> val vectors:RDD[(Int, SparseVector)]
>>>>>>
>>>>>> val invertedIndexes:RDD[(Int, InvIndex)] =
>>>>>> a.reduceByKey(generateInvertedIndex)
>>>>>> vectors:RDD.mapPartitions{
>>>>>> iter =>
>>>>>>  val invIndex = invertedIndexes(samePartitionKey)
>>>>>>  iter.map(invIndex.calculateSimilarity(_))
>>>>>>  )
>>>>>> }
>>>>>>
>>>>>> How could I go about setting up the Partition such that the specific
>>>>>> data
>>>>>> structure I need will be present for the mapPartition but I won't
>>>>>> have the
>>>>>> extra overhead of sending over all values (which would happen if I
>>>

Re: spark job server

2016-01-16 Thread Ted Yu
Which distro are you using ?

>From the error message, compute-classpath.sh was not found.
I searched Spark 1.6 built for hadoop 2.6 but didn't find
either compute-classpath.sh or server_start.sh

Cheers

On Sat, Jan 16, 2016 at 5:33 AM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> I am not able to start spark job sever. I am facing below error. Please
> let me know, how to resolve this issue.
>
> I have configured one master and two workers in cluster mode.
>
> ./server_start.sh
>
>
> *./server_start.sh: line 52: kill: (19621) - No such
> process./server_start.sh: line 78:
> /home/spark-1.5.2-bin-hadoop2.6/bin/compute-classpath.sh: No such file or
> directory*
> Regards,
> Rajesh
>


Re: spark source Intellij

2016-01-15 Thread Ted Yu
See:

http://search-hadoop.com/m/q3RTtZbuxxp9p6N1=Re+Best+IDE+Configuration

> On Jan 15, 2016, at 2:19 AM, Sanjeev Verma  wrote:
> 
> I want to configure spark source code into Intellij IDEA Is there any 
> document available / known steps which can guide me to configure spark 
> project in to the Intellij IDEA. 
> 
> Any help will be appreciated
> Thanks

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark App -Yarn-Cluster-Mode ===> Hadoop_conf_**.zip file.

2016-01-15 Thread Ted Yu
bq. check application tracking
page:http://slave1:8088/proxy/application_1452763526769_0011/
Then , ...

Have you done the above to see what error was in each attempt ?

Which Spark / hadoop release are you using ?

Thanks

On Fri, Jan 15, 2016 at 5:58 AM, Siddharth Ubale <
siddharth.ub...@syncoms.com> wrote:

> Hi,
>
>
>
> I am trying to run a Spark streaming application in yarn-cluster mode.
> However I am facing an issue where the job ends asking for a particular
> Hadoop_conf_**.zip file in hdfs location.
>
> Can any one guide with this?
>
> The application works fine in local mode only it stops abruptly for want
> of memory.
>
>
>
> Below is the error stack trace:
>
>
>
> diagnostics: Application application_1452763526769_0011 failed 2 times due
> to AM Container for appattempt_1452763526769_0011_02 exited with
> exitCode: -1000
>
> For more detailed output, check application tracking page:
> http://slave1:8088/proxy/application_1452763526769_0011/Then, click on
> links to logs of each attempt.
>
> Diagnostics: File does not exist:
> hdfs://slave1:9000/user/hduser/.sparkStaging/application_1452763526769_0011/__hadoop_conf__1057113228186399290.zip
>
> *java.io.FileNotFoundException: File does not exist:
> hdfs://slave1:9000/user/hduser/.sparkStaging/application_1452763526769_0011/__hadoop_conf__1057113228186399290.zip*
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
>
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:251)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
>
> at
> org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
>
> at
> org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
>
> at java.security.AccessController.doPrivileged(Native
> Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Failing this attempt. Failing the application.
>
> ApplicationMaster host: N/A
>
> ApplicationMaster RPC port: -1
>
> queue: default
>
> start time: 1452866026622
>
> final status: FAILED
>
> tracking URL:
> http://slave1:8088/cluster/app/application_1452763526769_0011
>
> user: hduser
>
> Exception in thread "main" org.apache.spark.SparkException: Application
> application_1452763526769_0011 finished with failed status
>
> at
> org.apache.spark.deploy.yarn.Client.run(Client.scala:841)
>
> at
> org.apache.spark.deploy.yarn.Client$.main(Client.scala:867)
>
> at org.apache.spark.deploy.yarn.Client.main(Client.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>
> at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>
> at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>
> at
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> 16/01/15 19:23:53 INFO Utils: Shutdown hook called
>
> 16/01/15 19:23:53 INFO Utils: Deleting directory
> 

Re: Serialization stack error

2016-01-15 Thread Ted Yu
Here is signature for Get:

public class Get extends Query

  implements Row, Comparable {

It is not Serializable.


FYI

On Fri, Jan 15, 2016 at 6:37 AM, beeshma r  wrote:

> HI i am trying to get data from Solr server .
>
> This is my code
>
> /*input is JavaRDD li
> *output is  JavaRDD for scanning Hbase*/
>
>
> public static JavaRDD getdocs(JavaRDD li)
> {
>
> JavaRDD newdocs=li;
>
> JavaRDD n=newdocs.map(new Function(){
>
> public Get call(SolrDocumentList si) throws IOException
> {
> Get get = null;
>
> for (SolrDocument doc : si) {
> get = new Get(Bytes.toBytes(((String)
> doc.getFieldValue("id";
>
> }
>
> return get;
>
> }
>
>
> }
>
>
> issue am getting below error
>
> 16/01/15 06:05:55 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 0, localhost, PROCESS_LOCAL, 2815 bytes)
> 16/01/15 06:05:55 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 16/01/15 06:05:55 INFO HttpClientUtil: Creating new http client,
> config:maxConnections=128=32=false
> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
> config:maxConnections=128=32=false
> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
> config:maxConnections=128=32=false
> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
> config:maxConnections=128=32=false
> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
> config:maxConnections=128=32=false
> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
> config:maxConnections=128=32=false
> 16/01/15 06:05:57 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
>
>
>
> *java.io.NotSerializableException:
> org.apache.hadoop.hbase.client.GetSerialization stack:- object not
> serializable (class: org.apache.hadoop.hbase.client.Get, value:
> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
> - element of array (index: 0)*
> - array (class [Ljava.lang.Object;, size 6)
> at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:724)
> 16/01/15 06:05:57 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had
> a not serializable result: org.apache.hadoop.hbase.client.Get
> Serialization stack:
> - object not serializable (class: org.apache.hadoop.hbase.client.Get,
> value:
> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
> - element of array (index: 0)
> - array (class [Ljava.lang.Object;, size 6); not retrying
> 16/01/15 06:05:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
> 16/01/15 06:05:57 INFO TaskSchedulerImpl: Cancelling stage 0
> 16/01/15 06:05:57 INFO DAGScheduler: ResultStage 0 (collect at
> App.java:278) failed in 2.481 s
> 16/01/15 06:05:57 INFO DAGScheduler: Job 0 failed: collect at
> App.java:278, took 3.378240 s
> [WARNING]
> java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
> at java.lang.Thread.run(Thread.java:724)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result:
> org.apache.hadoop.hbase.client.Get
> Serialization stack:
> - object not serializable (class: org.apache.hadoop.hbase.client.Get,
> value:
> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
> - element of array (index: 0)
> - array (class [Ljava.lang.Object;, size 6)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at
> 

Re: Spark App -Yarn-Cluster-Mode ===> Hadoop_conf_**.zip file.

2016-01-15 Thread Ted Yu
(PhoenixConnect.java:26)
>
> at
> spark.stream.eventStream.startStream(eventStream.java:105)
>
> at
> time.series.wo.agg.InputStreamSpark.main(InputStreamSpark.java:38)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:483)
>
> Thanks,
>
> Siddharth
>
>
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Friday, January 15, 2016 7:43 PM
> *To:* Siddharth Ubale <siddharth.ub...@syncoms.com>
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark App -Yarn-Cluster-Mode ===> Hadoop_conf_**.zip file.
>
>
>
> bq. check application tracking 
> page:http://slave1:8088/proxy/application_1452763526769_0011/
> Then <http://slave1:8088/proxy/application_1452763526769_0011/Then>, ...
>
>
>
> Have you done the above to see what error was in each attempt ?
>
>
>
> Which Spark / hadoop release are you using ?
>
>
>
> Thanks
>
>
>
> On Fri, Jan 15, 2016 at 5:58 AM, Siddharth Ubale <
> siddharth.ub...@syncoms.com> wrote:
>
> Hi,
>
>
>
> I am trying to run a Spark streaming application in yarn-cluster mode.
> However I am facing an issue where the job ends asking for a particular
> Hadoop_conf_**.zip file in hdfs location.
>
> Can any one guide with this?
>
> The application works fine in local mode only it stops abruptly for want
> of memory.
>
>
>
> Below is the error stack trace:
>
>
>
> diagnostics: Application application_1452763526769_0011 failed 2 times due
> to AM Container for appattempt_1452763526769_0011_02 exited with
> exitCode: -1000
>
> For more detailed output, check application tracking page:
> http://slave1:8088/proxy/application_1452763526769_0011/Then, click on
> links to logs of each attempt.
>
> Diagnostics: File does not exist:
> hdfs://slave1:9000/user/hduser/.sparkStaging/application_1452763526769_0011/__hadoop_conf__1057113228186399290.zip
>
> *java.io.FileNotFoundException: File does not exist:
> hdfs://slave1:9000/user/hduser/.sparkStaging/application_1452763526769_0011/__hadoop_conf__1057113228186399290.zip*
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
>
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:251)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
>
> at
> org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
>
> at
> org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
>
> at java.security.AccessController.doPrivileged(Native
> Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Failing this attempt. Failing the application.
>
> ApplicationMaster host: N/A
>
> ApplicationMaster RPC port: -1
>
> queue: default
>
> start time: 1452866026622
>
> final status: FAILED
>
> tracking URL:
> http://slave1:8088/cl

Re: Serialization stack error

2016-01-15 Thread Ted Yu
Can you encapsulate your map function such that it returns data type other
than Get ?

You can perform query to hbase but don't return Get.

Cheers

On Fri, Jan 15, 2016 at 6:46 AM, beeshma r <beeshm...@gmail.com> wrote:

> Hi Ted ,
>
> Any suggestions for changing this piece of code?
>
> public static JavaRDD getdocs(JavaRDD<
> SolrDocumentList> li)
> {
>
> JavaRDD newdocs=li;
>
> JavaRDD n=newdocs.map(new Function<SolrDocumentList,Get>(){
>
> public Get call(SolrDocumentList si) throws IOException
> {
> Get get = null;
>
> for (SolrDocument doc : si) {
> get = new Get(Bytes.toBytes(((String)
> doc.getFieldValue("id";
>
> }
>
>         return get;
>
> }
>
>
> }
>
>
>
> On Fri, Jan 15, 2016 at 6:40 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Here is signature for Get:
>>
>> public class Get extends Query
>>
>>   implements Row, Comparable {
>>
>> It is not Serializable.
>>
>>
>> FYI
>>
>> On Fri, Jan 15, 2016 at 6:37 AM, beeshma r <beeshm...@gmail.com> wrote:
>>
>>> HI i am trying to get data from Solr server .
>>>
>>> This is my code
>>>
>>> /*input is JavaRDD li
>>> *output is  JavaRDD for scanning Hbase*/
>>>
>>>
>>> public static JavaRDD getdocs(JavaRDD li)
>>> {
>>>
>>> JavaRDD newdocs=li;
>>>
>>> JavaRDD n=newdocs.map(new Function<SolrDocumentList,Get>(){
>>>
>>> public Get call(SolrDocumentList si) throws IOException
>>> {
>>> Get get = null;
>>>
>>> for (SolrDocument doc : si) {
>>> get = new Get(Bytes.toBytes(((String)
>>> doc.getFieldValue("id";
>>>
>>> }
>>>
>>> return get;
>>>
>>> }
>>>
>>>
>>> }
>>>
>>>
>>> issue am getting below error
>>>
>>> 16/01/15 06:05:55 INFO TaskSetManager: Starting task 0.0 in stage 0.0
>>> (TID 0, localhost, PROCESS_LOCAL, 2815 bytes)
>>> 16/01/15 06:05:55 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
>>> 16/01/15 06:05:55 INFO HttpClientUtil: Creating new http client,
>>> config:maxConnections=128=32=false
>>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>>> config:maxConnections=128=32=false
>>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>>> config:maxConnections=128=32=false
>>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>>> config:maxConnections=128=32=false
>>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>>> config:maxConnections=128=32=false
>>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>>> config:maxConnections=128=32=false
>>> 16/01/15 06:05:57 ERROR Executor: Exception in task 0.0 in stage 0.0
>>> (TID 0)
>>>
>>>
>>>
>>> *java.io.NotSerializableException:
>>> org.apache.hadoop.hbase.client.GetSerialization stack:- object not
>>> serializable (class: org.apache.hadoop.hbase.client.Get, value:
>>> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
>>> - element of array (index: 0)*
>>> - array (class [Ljava.lang.Object;, size 6)
>>> at
>>> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>>> at
>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>>> at
>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:724)
>>> 16/01/15 06:05:57 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0)
>>> had a not serializable resul

Re: Sending large objects to specific RDDs

2016-01-15 Thread Ted Yu
My knowledge of XSEDE is limited - I visited the website.

If there is no easy way to deploy HBase, alternative approach (using hdfs
?) needs to be considered.

I need to do more homework on this :-)

On Thu, Jan 14, 2016 at 3:51 PM, Daniel Imberman <daniel.imber...@gmail.com>
wrote:

> Hi Ted,
>
> So unfortunately after looking into the cluster manager that I will be
> using for my testing (I'm using a super-computer called XSEDE rather than
> AWS), it looks like the cluster does not actually come with Hbase installed
> (this cluster is becoming somewhat problematic, as it is essentially AWS
> but you have to do your own virtualization scripts). Do you have any other
> thoughts on how I could go about dealing with this purely using spark and
> HDFS?
>
> Thank you
>
> On Wed, Jan 13, 2016 at 11:49 AM Daniel Imberman <
> daniel.imber...@gmail.com> wrote:
>
>> Thank you Ted! That sounds like it would probably be the most efficient
>> (with the least overhead) way of handling this situation.
>>
>> On Wed, Jan 13, 2016 at 11:36 AM Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Another approach is to store the objects in NoSQL store such as HBase.
>>>
>>> Looking up object should be very fast.
>>>
>>> Cheers
>>>
>>> On Wed, Jan 13, 2016 at 11:29 AM, Daniel Imberman <
>>> daniel.imber...@gmail.com> wrote:
>>>
>>>> I'm looking for a way to send structures to pre-determined partitions
>>>> so that
>>>> they can be used by another RDD in a mapPartition.
>>>>
>>>> Essentially I'm given and RDD of SparseVectors and an RDD of inverted
>>>> indexes. The inverted index objects are quite large.
>>>>
>>>> My hope is to do a MapPartitions within the RDD of vectors where I can
>>>> compare each vector to the inverted index. The issue is that I only
>>>> NEED one
>>>> inverted index object per partition (which would have the same key as
>>>> the
>>>> values within that partition).
>>>>
>>>>
>>>> val vectors:RDD[(Int, SparseVector)]
>>>>
>>>> val invertedIndexes:RDD[(Int, InvIndex)] =
>>>> a.reduceByKey(generateInvertedIndex)
>>>> vectors:RDD.mapPartitions{
>>>> iter =>
>>>>  val invIndex = invertedIndexes(samePartitionKey)
>>>>  iter.map(invIndex.calculateSimilarity(_))
>>>>  )
>>>> }
>>>>
>>>> How could I go about setting up the Partition such that the specific
>>>> data
>>>> structure I need will be present for the mapPartition but I won't have
>>>> the
>>>> extra overhead of sending over all values (which would happen if I were
>>>> to
>>>> make a broadcast variable).
>>>>
>>>> One thought I have been having is to store the objects in HDFS but I'm
>>>> not
>>>> sure if that would be a suboptimal solution (It seems like it could slow
>>>> down the process a lot)
>>>>
>>>> Another thought I am currently exploring is whether there is some way I
>>>> can
>>>> create a custom Partition or Partitioner that could hold the data
>>>> structure
>>>> (Although that might get too complicated and become problematic)
>>>>
>>>> Any thoughts on how I could attack this issue would be highly
>>>> appreciated.
>>>>
>>>> thank you for your help!
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Sending-large-objects-to-specific-RDDs-tp25967.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>


Re: Compiling only MLlib?

2016-01-15 Thread Ted Yu
Looks like you didn't have zinc running.

Take a look at install_zinc() in build/mvn, around line 83.
You can use build/mvn instead of running mvn directly.

I normally use the following command line:

bin/mvn clean -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4
-Dhadoop.version=2.7.0 package -DskipTests

After one full build, you should be able to build MLlib module alone.

Cheers

On Fri, Jan 15, 2016 at 6:13 PM, Colin Woodbury  wrote:

> Hi, I'm very much interested in using Spark's MLlib in standalone
> programs. I've never used Hadoop, and don't intend to deploy on massive
> clusters. Building Spark has been an honest nightmare, and I've been on and
> off it for weeks.
>
> The build always runs out of RAM on my laptop (4g of RAM, Arch Linux) when
> I try to build with Scala 2.11 support. No matter how I tweak JVM flags to
> reduce maximum RAM use, the build always crashes.
>
> When trying to build Spark 1.6.0 for Scala 2.10 just now, the build had
> compilation errors. Here is one, as a sample. I've saved the rest:
>
> [error]
> /home/colin/building/apache-spark/spark-1.6.0/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala:16:
> object jline is not a member of package tools
> [error] import scala.tools.jline.console.completer._
>
> It informs me:
>
> [ERROR] After correcting the problems, you can resume the build with the
> command
> [ERROR]   mvn  -rf :spark-repl_2.10
>
> I don't feel safe doing that, given that I don't know what my ""
> are.
>
> I've noticed that the build is compiling a lot of things I have no
> interest in. Is it possible to just compile the Spark core, its tools, and
> MLlib? I just want to experiment, and this is causing me a  lot of stress.
>
> Thank you kindly,
> Colin
>


Re: Executor initialize before all resources are ready

2016-01-15 Thread Ted Yu
Which Spark release are you using ?

Thanks

On Fri, Jan 15, 2016 at 7:08 PM, Byron Wang  wrote:

> Hi, I am building metrics system for Spark Streaming job, in the system,
> the
> metrics are collected in each executor, so a metrics source (a class used
> to
> collect metrics) needs to be initialized in each executor.
> The metrics source is packaged in a jar, when submitting a job, the jar is
> sent from local to each executor using the parameter '--jars', however, the
> executor starts to initialize the metrics source class before the jar
> arrives, as a result, it throws class not found exception.
> It seems that if the executor could wait until all resources are ready, the
> issue will be resolved, but I really do not know how to do it.
>
> Is there anyone facing the same issue?
>
> PS: I tried using HDFS (copy the jar to HDFS, then submit the job and let
> the executor load class from a path in HDFS), but it fails. I checked the
> source code, it seems that the class loader can only resolve local path.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Executor-initialize-before-all-resources-are-ready-tp25981.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark and HBase RDD join/get

2016-01-14 Thread Ted Yu
For #1, yes it is possible.

You can find some example in hbase-spark module of hbase where hbase as
DataSource is provided.
e.g.

https://github.com/apache/hbase/blob/master/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala

Cheers

On Thu, Jan 14, 2016 at 5:04 AM, Kristoffer Sjögren 
wrote:

> Hi
>
> We have a RDD that needs to be mapped with information from
> HBase, where the exact key is the user id.
>
> What's the different alternatives for doing this?
>
> - Is it possible to do HBase.get() requests from a map function in Spark?
> - Or should we join RDDs with all full HBase table scan?
>
> I ask because full table scans feels inefficient, especially if the
> input RDD is really small compared to the full table. But I
> realize that a full table scan may not be what happens in reality?
>
> Cheers,
> -Kristoffer
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: code hangs in local master mode

2016-01-14 Thread Ted Yu
Can you capture one or two stack traces of the local master process and
pastebin them ?

Thanks

On Thu, Jan 14, 2016 at 6:01 AM, Kai Wei  wrote:

> Hi list,
>
> I ran into an issue which I think could be a bug.
>
> I have a Hive table stored as parquet files. Let's say it's called
> testtable. I found the code below stuck forever in spark-shell with a local
> master or driver/executor:
> sqlContext.sql("select * from testtable").rdd.cache.zipWithIndex().count
>
> But it works if I use a standalone master.
>
> I also tried several different variants:
> don't cache the rdd(works):
> sqlContext.sql("select * from testtable").rdd.zipWithIndex().count
>
> cache the rdd after zipWithIndex(works):
> sqlContext.sql("select * from testtable").rdd.zipWithIndex().cache.count
>
> use parquet file reader(doesn't work):
>
> sqlContext.read.parquet("hdfs://localhost:8020/user/hive/warehouse/testtable").rdd.cache.zipWithIndex().count
>
> use parquet files on local file system(works)
> sqlContext.read.parquet("/tmp/testtable").rdd.cache.zipWithIndex().count
>
> I read the code of zipWithIndex() and the DAG visualization. I think the
> function cause the Spark firstly retrieve n-1 partitions of target table
> and cache them, then the last partition. It must be something wrong when
> the driver/executor tries to read the last parition from HDFS .
>
> I am using spark-1.5.2-bin-hadoop-2.6 on cloudera quickstart vm 5.4.2.
>
> --
> Kai Wei
> Big Data Developer
>
> Pythian - love your data
>
> w...@pythian.com
> Tel: +1 613 565 8696 x1579
> Mobile: +61 403 572 456
>
> --
>
>
>
>


Re: Concurrent Read of Accumulator's Value

2016-01-13 Thread Ted Yu
One option is to use a NoSQL data store, such as hbase, for the two actions
to exchange status information.
Write to data store in action 1 and read from action 2.

Cheers

On Wed, Jan 13, 2016 at 2:20 AM, Kira  wrote:

> Hi,
>
> So i have an action on one RDD that is relatively long, let's call it ac1;
> what i want to do is to execute another action (ac2) on the same RDD to see
> the evolution of the first one (ac1); for this end i want to use an
> accumulator and read it's value progressively to see the changes on it (on
> the fly) while ac1 is always running. My problem is that the accumulator is
> only updated once the ac1 has been finished, this is not helpful for me :/
> .
>
> I ve seen  here
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-td15758.html
> >
> what may seem like a solution for me but it doesn t work : "While Spark
> already offers support for asynchronous reduce (collect data from workers,
> while not interrupting execution of a parallel transformation) through
> accumulator"
>
> Another post suggested to use SparkListner to do that.
>
> are these solutions correct ? if yes, give me a simple exemple ?
> are there other solutions ?
>
> thank you.
> Regards
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Read-of-Accumulator-s-Value-tp25957.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SQL UDF problem (with re to types)

2016-01-13 Thread Ted Yu
Looks like BigDecimal was passed to your call() method.

Can you modify your udf to see if using BigDecimal works ?

Cheers

On Wed, Jan 13, 2016 at 11:58 AM, raghukiran  wrote:

> While registering and using SQL UDFs, I am running into the following
> problem:
>
> UDF registered:
>
> ctx.udf().register("Test", new UDF1() {
> /**
>  *
>  */
> private static final long serialVersionUID =
> -8231917155671435931L;
>
> public String call(Double x) throws Exception {
> return "testing";
> }
> }, DataTypes.StringType);
>
> Usage:
> query = "SELECT Test(82.4)";
> result = sqlCtx.sql(query).first();
> System.out.println(result.toString());
>
> Problem: Class Cast exception thrown
> Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be
> cast
> to java.lang.Double
>
> This problem occurs with Spark v1.5.2 and 1.6.0.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SQL-UDF-problem-with-re-to-types-tp25968.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Sending large objects to specific RDDs

2016-01-13 Thread Ted Yu
Another approach is to store the objects in NoSQL store such as HBase.

Looking up object should be very fast.

Cheers

On Wed, Jan 13, 2016 at 11:29 AM, Daniel Imberman  wrote:

> I'm looking for a way to send structures to pre-determined partitions so
> that
> they can be used by another RDD in a mapPartition.
>
> Essentially I'm given and RDD of SparseVectors and an RDD of inverted
> indexes. The inverted index objects are quite large.
>
> My hope is to do a MapPartitions within the RDD of vectors where I can
> compare each vector to the inverted index. The issue is that I only NEED
> one
> inverted index object per partition (which would have the same key as the
> values within that partition).
>
>
> val vectors:RDD[(Int, SparseVector)]
>
> val invertedIndexes:RDD[(Int, InvIndex)] =
> a.reduceByKey(generateInvertedIndex)
> vectors:RDD.mapPartitions{
> iter =>
>  val invIndex = invertedIndexes(samePartitionKey)
>  iter.map(invIndex.calculateSimilarity(_))
>  )
> }
>
> How could I go about setting up the Partition such that the specific data
> structure I need will be present for the mapPartition but I won't have the
> extra overhead of sending over all values (which would happen if I were to
> make a broadcast variable).
>
> One thought I have been having is to store the objects in HDFS but I'm not
> sure if that would be a suboptimal solution (It seems like it could slow
> down the process a lot)
>
> Another thought I am currently exploring is whether there is some way I can
> create a custom Partition or Partitioner that could hold the data structure
> (Although that might get too complicated and become problematic)
>
> Any thoughts on how I could attack this issue would be highly appreciated.
>
> thank you for your help!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Sending-large-objects-to-specific-RDDs-tp25967.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SQL UDF problem (with re to types)

2016-01-13 Thread Ted Yu
Please take a look
at sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java
which shows a UserDefinedAggregateFunction that works on DoubleType column.

sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
shows how it is registered.

Cheers

On Wed, Jan 13, 2016 at 11:58 AM, raghukiran  wrote:

> While registering and using SQL UDFs, I am running into the following
> problem:
>
> UDF registered:
>
> ctx.udf().register("Test", new UDF1() {
> /**
>  *
>  */
> private static final long serialVersionUID =
> -8231917155671435931L;
>
> public String call(Double x) throws Exception {
> return "testing";
> }
> }, DataTypes.StringType);
>
> Usage:
> query = "SELECT Test(82.4)";
> result = sqlCtx.sql(query).first();
> System.out.println(result.toString());
>
> Problem: Class Cast exception thrown
> Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be
> cast
> to java.lang.Double
>
> This problem occurs with Spark v1.5.2 and 1.6.0.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SQL-UDF-problem-with-re-to-types-tp25968.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to get the working directory in executor

2016-01-13 Thread Ted Yu
Can you place metrics.properties and
datainsights-metrics-source-assembly-1.0.jar
on hdfs ?

Cheers

On Wed, Jan 13, 2016 at 8:01 AM, Byron Wang  wrote:

> I am using the following command to submit Spark job, I hope to send jar
> and
> config files to each executor and load it there
>
> spark-submit --verbose \
> --files=/tmp/metrics.properties \
> --jars /tmp/datainsights-metrics-source-aembly-1.0.jar \
> --total-executor-cores 4\
> --conf "spark.metrics.conf=metrics.properties" \
> --conf
>
> "spark.executor.extraClassPath=datainsights-metrics-source-assembly-1.0.jar"
> \
> --class org.microsoft.ofe.datainsights.StartServiceSignalPipeline \
> ./target/datainsights-1.0-jar-with-dependencies.jar
>
> --files and --jars is used to send files to executors, I found that the
> files are sent to the working directory of executor like
> 'worker/app-x-/0/
>
> But when job is running, the executor always throws exception saying that
> it
> could not find the file 'metrics.properties'or the class which is contained
> in 'datainsights-metrics-source-assembly-1.0.jar'. It seems that the job is
> looking for files under another dir other than working directory.
>
> Do you know how to load the file which is sent to executors?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-working-directory-in-executor-tp25962.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to get the working directory in executor

2016-01-13 Thread Ted Yu
In a bit more detail:
You upload the files using 'hdfs dfs -copyFromLocal' command
Then specify hdfs location of the files on the command line.

Cheers

On Wed, Jan 13, 2016 at 8:05 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you place metrics.properties and 
> datainsights-metrics-source-assembly-1.0.jar
> on hdfs ?
>
> Cheers
>
> On Wed, Jan 13, 2016 at 8:01 AM, Byron Wang <open...@gmail.com> wrote:
>
>> I am using the following command to submit Spark job, I hope to send jar
>> and
>> config files to each executor and load it there
>>
>> spark-submit --verbose \
>> --files=/tmp/metrics.properties \
>> --jars /tmp/datainsights-metrics-source-aembly-1.0.jar \
>> --total-executor-cores 4\
>> --conf "spark.metrics.conf=metrics.properties" \
>> --conf
>>
>> "spark.executor.extraClassPath=datainsights-metrics-source-assembly-1.0.jar"
>> \
>> --class org.microsoft.ofe.datainsights.StartServiceSignalPipeline \
>> ./target/datainsights-1.0-jar-with-dependencies.jar
>>
>> --files and --jars is used to send files to executors, I found that the
>> files are sent to the working directory of executor like
>> 'worker/app-x-/0/
>>
>> But when job is running, the executor always throws exception saying that
>> it
>> could not find the file 'metrics.properties'or the class which is
>> contained
>> in 'datainsights-metrics-source-assembly-1.0.jar'. It seems that the job
>> is
>> looking for files under another dir other than working directory.
>>
>> Do you know how to load the file which is sent to executors?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-working-directory-in-executor-tp25962.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: yarn-client: SparkSubmitDriverBootstrapper not found in yarn client mode (1.6.0)

2016-01-13 Thread Ted Yu
Can you show the complete stack trace for the error ?

I searched 1.6.0 code base but didn't find the
class SparkSubmitDriverBootstrapper

Thanks

On Wed, Jan 13, 2016 at 9:31 AM, Lin Zhao  wrote:

> My job runs fine in yarn cluster mode but I have reason to use client mode
> instead. But I'm hitting this error when submitting:
> > spark-submit --class com.exabeam.martini.scripts.SparkStreamingTest
> --master yarn --deploy-mode client --executor-memory 90G --num-executors 3
> --executor-cores 14 Martini-assembly-0.1.jar yarn-client
>
> Error: Could not find or load main class
> org.apache.spark.deploy.SparkSubmitDriverBootstrapper
>
>  If I replace deploy-mode to cluster the job is submitted successfully. Is
> there a dependency missing from my project? Right now only one I included
> is spark-streaming 1.6.0.
>


Re: failure to parallelize an RDD

2016-01-12 Thread Ted Yu
Which release of Spark are you using ?

Can you turn on DEBUG logging to see if there is more clue ?

Thanks

On Tue, Jan 12, 2016 at 6:37 PM, AlexG  wrote:

> I transpose a matrix (colChunkOfA) stored as a 200-by-54843210 as an array
> of
> rows in Array[Array[Float]] format into another matrix (rowChunk) also
> stored row-wise as a 54843210-by-200 Array[Array[Float]] using the
> following
> code:
>
> val rowChunk = new Array[Tuple2[Int,Array[Float]]](numCols)
> val colIndices = (0 until colChunkOfA.length).toArray
>
> (0 until numCols).foreach( rowIdx => {
>   rowChunk(rowIdx) = Tuple2(rowIdx, colIndices.map(colChunkOfA(_)(rowIdx)))
> })
>
> This succeeds, but the following code which attempts to turn rowChunk into
> an RDD fails silently: spark-submit just ends, and none of the executor
> logs
> indicate any errors occurring.
>
> val parallelRowChunkRDD = sc.parallelize(rowChunk).cache
> parallelRowChunkRDD.count
>
> What is the culprit here?
>
> Here is the log output starting from the count instruction:
>
> 16/01/13 02:23:38 INFO SparkContext: Starting job: count at
> transposeAvroToAvroChunks.scala:129
> 16/01/13 02:23:38 INFO DAGScheduler: Got job 3 (count at
> transposeAvroToAvroChunks.scala:129) with 928 output partitions
> 16/01/13 02:23:38 INFO DAGScheduler: Final stage: ResultStage 3(count at
> transposeAvroToAvroChunks.scala:129)
> 16/01/13 02:23:38 INFO DAGScheduler: Parents of final stage: List()
> 16/01/13 02:23:38 INFO DAGScheduler: Missing parents: List()
> 16/01/13 02:23:38 INFO DAGScheduler: Submitting ResultStage 3
> (ParallelCollectionRDD[2448] at parallelize at
> transposeAvroToAvroChunks.scala:128), which has no missing parents
> 16/01/13 02:23:38 INFO MemoryStore: ensureFreeSpace(1048) called with
> curMem=50917367, maxMem=127452201615
> 16/01/13 02:23:38 INFO MemoryStore: Block broadcast_615 stored as values in
> memory (estimated size 1048.0 B, free 118.7 GB)
> 16/01/13 02:23:38 INFO MemoryStore: ensureFreeSpace(740) called with
> curMem=50918415, maxMem=127452201615
> 16/01/13 02:23:38 INFO MemoryStore: Block broadcast_615_piece0 stored as
> bytes in memory (estimated size 740.0 B, free 118.7 GB)
> 16/01/13 02:23:38 INFO BlockManagerInfo: Added broadcast_615_piece0 in
> memory on 172.31.36.112:36581 (size: 740.0 B, free: 118.7 GB)
> 16/01/13 02:23:38 INFO SparkContext: Created broadcast 615 from broadcast
> at
> DAGScheduler.scala:861
> 16/01/13 02:23:38 INFO DAGScheduler: Submitting 928 missing tasks from
> ResultStage 3 (ParallelCollectionRDD[2448] at parallelize at
> transposeAvroToAvroChunks.scala:128)
> 16/01/13 02:23:38 INFO TaskSchedulerImpl: Adding task set 3.0 with 928
> tasks
> 16/01/13 02:23:39 WARN TaskSetManager: Stage 3 contains a task of very
> large
> size (47027 KB). The maximum recommended task size is 100 KB.
> 16/01/13 02:23:39 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID
> 1219, 172.31.34.184, PROCESS_LOCAL, 48156290 bytes)
> ...
> 16/01/13 02:27:13 INFO TaskSetManager: Starting task 927.0 in stage 3.0
> (TID
> 2146, 172.31.42.67, PROCESS_LOCAL, 48224789 bytes)
> 16/01/13 02:27:17 INFO BlockManagerInfo: Removed broadcast_419_piece0 on
> 172.31.36.112:36581 in memory (size: 17.4 KB, free: 118.7 GB)
> 16/01/13 02:27:21 INFO BlockManagerInfo: Removed broadcast_419_piece0 on
> 172.31.35.157:51059 in memory (size: 17.4 KB, free: 10.4 GB)
> 16/01/13 02:27:21 INFO BlockManagerInfo: Removed broadcast_419_piece0 on
> 172.31.47.118:34888 in memory (size: 17.4 KB, free: 10.4 GB)
> 16/01/13 02:27:22 INFO BlockManagerInfo: Removed broadcast_419_piece0 on
> 172.31.38.42:48582 in memory (size: 17.4 KB, free: 10.4 GB)
> 16/01/13 02:27:38 INFO BlockManagerInfo: Added broadcast_615_piece0 in
> memory on 172.31.41.68:59281 (size: 740.0 B, free: 10.4 GB)
> 16/01/13 02:27:55 INFO BlockManagerInfo: Added broadcast_615_piece0 in
> memory on 172.31.47.118:59575 (size: 740.0 B, free: 10.4 GB)
> 16/01/13 02:28:47 INFO BlockManagerInfo: Added broadcast_615_piece0 in
> memory on 172.31.40.24:55643 (size: 740.0 B, free: 10.4 GB)
> 16/01/13 02:28:49 INFO BlockManagerInfo: Added broadcast_615_piece0 in
> memory on 172.31.47.118:53671 (size: 740.0 B, free: 10.4 GB)
>
> This is the end of the log, so it looks like all 928 tasks got started, but
> presumably somewhere in running, they ran into an error. Nothing shows up
> in
> the executor logs.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/failure-to-parallelize-an-RDD-tp25950.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Windows driver cannot run job on Linux cluster

2016-01-11 Thread Ted Yu
Which release of Spark are you using ?

Can you pastebin stack trace of executor(s) so that we can have some more
clue ?

Thanks

On Mon, Jan 11, 2016 at 1:10 PM, Andrew Wooster 
wrote:

> I have a very simple program that runs fine on my Linux server that runs
> Spark master and worker in standalone mode.
>
> public class SimpleSpark {
> public int sum () {
> SparkConf conf = new SparkConf()
> .setAppName("Magellan")
> .setMaster("spark://
> ec2-nnn-nnn-nnn-nnn.compute-1.amazonaws.com:11407")
> .setJars(new String[]
> {"target/magellan-spark-1.0-SNAPSHOT.jar"});
> JavaSparkContext sc = new JavaSparkContext(conf);
>
> List data = Arrays.asList(1, 2, 3, 4, 5);
> JavaRDD distData = sc.parallelize(data);
> int total = distData.reduce(new SumFunc());
> return total;
>}
>
> public static class SumFunc implements Function2 Integer> {
> public Integer call(Integer a, Integer b) {
> return a + b;
> }
> };
>
> However, when I run the same driver from a Windows machine it outputs the
> following message and never completes:
>   16/01/11 20:51:11 WARN TaskSchedulerImpl: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient resources
>
> I have checked the cluster UI and the job is marked as RUNNING (so it does
> not appears to be waiting on a worker).  I do not see anything out of the
> ordinary in the master and worker logs.
>
> How do I debug a problem like this?
> -Andrew
>


Re: partitioning RDD

2016-01-11 Thread Ted Yu
Hi,
Please use proper subject when sending email to user@

In your example below, what do the values inside curly braces represent ?
I assume not the keys since values for same key should go to the same
partition.

Cheers

On Mon, Jan 11, 2016 at 10:51 AM, Daniel Imberman  wrote:

> Hi all,
>
> I'm looking for a way to efficiently partition an RDD, but allow the same
> data to exists on multiple partitions.
>
>
> Lets say I have a key-value RDD with keys {1,2,3,4}
>
> I want to be able to to repartition the RDD so that so the partitions look
> like
>
> p1 = {1,2}
> p2 = {2,3}
> p3 = {3,4}
>
> Locality is important in this situation as I would be doing internal
> comparison values.
>
> Does anyone have any thoughts as to how I could go about doing this?
>
> Thank you
>


Re: Best IDE Configuration

2016-01-10 Thread Ted Yu
For python, there is https://gist.github.com/bigaidream/40fe0f8267a80e7c9cf8
which was mentioned in http://search-hadoop.com/m/q3RTt2Eu941D9H9t1

FYI

On Sat, Jan 9, 2016 at 11:24 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> Please take a look at:
> https://cwiki.apache.org/confluence/display/SPARK/
> Useful+Developer+Tools#UsefulDeveloperTools-IDESetup
>
> On Sat, Jan 9, 2016 at 11:16 AM, Jorge Machado <jom...@me.com> wrote:
>
>> Hello everyone,
>>
>>
>> I´m just wondering how do you guys develop for spark.
>>
>> For example I cannot find any decent documentation for connecting Spark
>> to Eclipse using maven or sbt.
>>
>> Is there any link around ?
>>
>>
>> Jorge
>> thanks
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to merge two large table and remove duplicates?

2016-01-09 Thread Ted Yu
See the first half of this wiki:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LZO

> On Jan 9, 2016, at 1:02 AM, Gavin Yue <yue.yuany...@gmail.com> wrote:
> 
> So I tried to set the parquet compression codec to lzo, but hadoop does not 
> have the lzo natives, while lz4 does included. 
> But I could set the code to lz4, it only accepts lzo. 
> 
> Any solution here?
> 
> Thank,
> Gavin
> 
> 
> 
>> On Sat, Jan 9, 2016 at 12:09 AM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>> I saw in the document, the value is LZO.Is it LZO or LZ4? 
>> 
>> https://github.com/Cyan4973/lz4
>> 
>> Based on this benchmark, they differ quite a lot. 
>> 
>> 
>> 
>>> On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>> gzip is relatively slow. It consumes much CPU.
>>> 
>>> snappy is faster.
>>> 
>>> LZ4 is faster than GZIP and smaller than Snappy.
>>> 
>>> Cheers
>>> 
>>>> On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>>>> Thank you .
>>>> 
>>>> And speaking of compression, is there big difference on performance 
>>>> between gzip and snappy? And why parquet is using gzip by default?
>>>> 
>>>> Thanks.
>>>> 
>>>> 
>>>>> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>> Cycling old bits:
>>>>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>>>>> 
>>>>> Gavin:
>>>>> Which release of hbase did you play with ?
>>>>> 
>>>>> HBase has been evolving and is getting more stable.
>>>>> 
>>>>> Cheers
>>>>> 
>>>>>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>>>>>> I used to maintain a HBase cluster. The experience with it was not 
>>>>>> happy. 
>>>>>> 
>>>>>> I just tried query the data  from each day's first and dedup with 
>>>>>> smaller set, the performance is acceptable.  So I guess I will use this 
>>>>>> method. 
>>>>>> 
>>>>>> Again, could anyone give advice about: 
>>>>>> Automatically determine the number of reducers for joins and groupbys: 
>>>>>> Currently in Spark SQL, you need to control the degree of parallelism 
>>>>>> post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>> Thanks.
>>>>>> 
>>>>>> Gavin
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>> bq. in an noSQL db such as Hbase
>>>>>>> 
>>>>>>> +1 :-)
>>>>>>> 
>>>>>>> 
>>>>>>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:
>>>>>>>> One option you may want to explore is writing event table in an noSQL 
>>>>>>>> db such as Hbase. One inherent problem in your approach is you always 
>>>>>>>> need to load either full data set or a defined number of partitions to 
>>>>>>>> see if the event has already come (and no gurantee it is full proof, 
>>>>>>>> but lead to unnecessary loading in most cases).
>>>>>>>> 
>>>>>>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com> 
>>>>>>>>> wrote:
>>>>>>>>> Hey, 
>>>>>>>>> Thank you for the answer. I checked the setting you mentioend they 
>>>>>>>>> are all correct.  I noticed that in the job, there are always only 
>>>>>>>>> 200 reducers for shuffle read, I believe it is setting in the sql 
>>>>>>>>> shuffle parallism. 
>>>>>>>>> 
>>>>>>>>> In the doc, it mentions: 
>>>>>>>>> Automatically determine the number of reducers for joins and 
>>>>>>>>> groupbys: Currently in Spark SQL, you need to control the degree of 
>>>>>>>>> parallelism post-shuffle using “SET 
>>>>>>>>> spark.sql.shuffle.partitions=[num_t

Re: Best IDE Configuration

2016-01-09 Thread Ted Yu
Please take a look at:
https://cwiki.apache.org/confluence/display/SPARK/
Useful+Developer+Tools#UsefulDeveloperTools-IDESetup

On Sat, Jan 9, 2016 at 11:16 AM, Jorge Machado  wrote:

> Hello everyone,
>
>
> I´m just wondering how do you guys develop for spark.
>
> For example I cannot find any decent documentation for connecting Spark to
> Eclipse using maven or sbt.
>
> Is there any link around ?
>
>
> Jorge
> thanks
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Do we need to enabled Tungsten sort in Spark 1.6?

2016-01-08 Thread Ted Yu
For "spark.shuffle.manager", the default is "sort"
>From core/src/main/scala/org/apache/spark/SparkEnv.scala :

val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")

"tungsten-sort" is the same as "sort" :

val shortShuffleMgrNames = Map(
  "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
  "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
  "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")

FYI

On Fri, Jan 8, 2016 at 12:59 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:

> ok thanks so it will be enabled by default always if yes then in
> documentation why default shuffle manager is mentioned as sort?
>
> On Sat, Jan 9, 2016 at 1:55 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> From
>> sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala :
>>
>> case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
>>   val runFunc = (sqlContext: SQLContext) => {
>> logWarning(
>>   s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated
>> and " +
>> s"will be ignored. Tungsten will continue to be used.")
>> Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
>>   }
>>
>> FYI
>>
>> On Fri, Jan 8, 2016 at 12:21 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>>
>>> Hi I was using Spark 1.5 with Tungsten sort and now I have using Spark
>>> 1.6 I
>>> dont see any difference I was expecting Spark 1.6 to be faster. Anyways
>>> do
>>> we need to enable Tunsten and unsafe options or they are enabled by
>>> default
>>> I see in documentation that default sort manager is sort I though it is
>>> Tungsten no? Please guide.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-need-to-enabled-Tungsten-sort-in-Spark-1-6-tp25923.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Do we need to enabled Tungsten sort in Spark 1.6?

2016-01-08 Thread Ted Yu
>From sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala :

case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
  val runFunc = (sqlContext: SQLContext) => {
logWarning(
  s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated
and " +
s"will be ignored. Tungsten will continue to be used.")
Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
  }

FYI

On Fri, Jan 8, 2016 at 12:21 PM, unk1102  wrote:

> Hi I was using Spark 1.5 with Tungsten sort and now I have using Spark 1.6
> I
> dont see any difference I was expecting Spark 1.6 to be faster. Anyways do
> we need to enable Tunsten and unsafe options or they are enabled by default
> I see in documentation that default sort manager is sort I though it is
> Tungsten no? Please guide.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-need-to-enabled-Tungsten-sort-in-Spark-1-6-tp25923.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Ted Yu
Is your Parquet data source partitioned by date ?

Can you dedup within partitions ?

Cheers

On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue  wrote:

> I tried on Three day's data.  The total input is only 980GB, but the
> shuffle write Data is about 6.2TB, then the job failed during shuffle read
> step, which should be another 6.2TB shuffle read.
>
> I think to Dedup, the shuffling can not be avoided. Is there anything I
> could do to stablize this process?
>
> Thanks.
>
>
> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue  wrote:
>
>> Hey,
>>
>> I got everyday's Event table and want to merge them into a single Event
>> table. But there so many duplicates among each day's data.
>>
>> I use Parquet as the data source.  What I am doing now is
>>
>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>> file").
>>
>> Each day's Event is stored in their own Parquet file
>>
>> But it failed at the stage2 which keeps losing connection to one
>> executor. I guess this is due to the memory issue.
>>
>> Any suggestion how I do this efficiently?
>>
>> Thanks,
>> Gavin
>>
>
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Ted Yu
Benyi:

bq. spark 1.5.2 gave me a wrong result when the data was about 300~400GB,
just for a simple group-by and aggregate

Can you reproduce the above using Spark 1.6.0 ?

Thanks

On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com> wrote:

>
>- I assume your parquet files are compressed. Gzip or Snappy?
>- What spark version did you use? It seems at least 1.4. If you use
>spark-sql and tungsten, you might have better performance. but spark 1.5.2
>gave me a wrong result when the data was about 300~400GB, just for a simple
>group-by and aggregate.
>- Did you use kyro serialization?
>- you should have spark.shuffle.compress=true, verify it.
>- How many tasks did you use? spark.default.parallelism=?
>- What about this:
>   - Read the data day by day
>   - compute a bucket id from timestamp, e.g., the date and hour
>   - Write into different buckets (you probably need a special writer
>   to write data efficiently without shuffling the data).
>   - distinct for each bucket. Because each bucket is small, spark can
>   get it done faster than having everything in one run.
>   - I think using groupBy (userId, timestamp) might be better than
>   distinct. I guess distinct() will compare every field.
>
>
> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>
>> And the most frequent operation I am gonna do is find the UserID who have
>> some events, then retrieve all the events associted with the UserID.
>>
>> In this case, how should I partition to speed up the process?
>>
>> Thanks.
>>
>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>>
>>> hey Ted,
>>>
>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>> the partition.
>>>
>>> Annoyingly, every day's incoming Event data having duplicates among each
>>> other.  One same event could show up in Day1 and Day2 and probably Day3.
>>>
>>> I only want to keep single Event table and each day it come so many
>>> duplicates.
>>>
>>> Is there a way I could just insert into Parquet and if duplicate found,
>>> just ignore?
>>>
>>> Thanks,
>>> Gavin
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Is your Parquet data source partitioned by date ?
>>>>
>>>> Can you dedup within partitions ?
>>>>
>>>> Cheers
>>>>
>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>> wrote:
>>>>
>>>>> I tried on Three day's data.  The total input is only 980GB, but the
>>>>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>>>>> step, which should be another 6.2TB shuffle read.
>>>>>
>>>>> I think to Dedup, the shuffling can not be avoided. Is there anything
>>>>> I could do to stablize this process?
>>>>>
>>>>> Thanks.
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>
>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>
>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>>>>>> file").
>>>>>>
>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>
>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>> executor. I guess this is due to the memory issue.
>>>>>>
>>>>>> Any suggestion how I do this efficiently?
>>>>>>
>>>>>> Thanks,
>>>>>> Gavin
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Ted Yu
gzip is relatively slow. It consumes much CPU.

snappy is faster.

LZ4 is faster than GZIP and smaller than Snappy.

Cheers

On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> Thank you .
>
> And speaking of compression, is there big difference on performance
> between gzip and snappy? And why parquet is using gzip by default?
>
> Thanks.
>
>
> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Cycling old bits:
>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>>
>> Gavin:
>> Which release of hbase did you play with ?
>>
>> HBase has been evolving and is getting more stable.
>>
>> Cheers
>>
>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>>
>>> I used to maintain a HBase cluster. The experience with it was not
>>> happy.
>>>
>>> I just tried query the data  from each day's first and dedup with
>>> smaller set, the performance is acceptable.  So I guess I will use this
>>> method.
>>>
>>> Again, could anyone give advice about:
>>>
>>>- Automatically determine the number of reducers for joins and
>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>parallelism post-shuffle using “SET
>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>
>>> Thanks.
>>>
>>> Gavin
>>>
>>>
>>>
>>>
>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> bq. in an noSQL db such as Hbase
>>>>
>>>> +1 :-)
>>>>
>>>>
>>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:
>>>>
>>>>> One option you may want to explore is writing event table in an noSQL
>>>>> db such as Hbase. One inherent problem in your approach is you always need
>>>>> to load either full data set or a defined number of partitions to see if
>>>>> the event has already come (and no gurantee it is full proof, but lead to
>>>>> unnecessary loading in most cases).
>>>>>
>>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey,
>>>>>> Thank you for the answer. I checked the setting you mentioend they
>>>>>> are all correct.  I noticed that in the job, there are always only 200
>>>>>> reducers for shuffle read, I believe it is setting in the sql shuffle
>>>>>> parallism.
>>>>>>
>>>>>> In the doc, it mentions:
>>>>>>
>>>>>>- Automatically determine the number of reducers for joins and
>>>>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>>>>parallelism post-shuffle using “SET
>>>>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>>
>>>>>>
>>>>>> What would be the ideal number for this setting? Is it based on the
>>>>>> hardware of cluster?
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Gavin
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>- I assume your parquet files are compressed. Gzip or Snappy?
>>>>>>>- What spark version did you use? It seems at least 1.4. If you
>>>>>>>use spark-sql and tungsten, you might have better performance. but 
>>>>>>> spark
>>>>>>>1.5.2 gave me a wrong result when the data was about 300~400GB, just 
>>>>>>> for a
>>>>>>>simple group-by and aggregate.
>>>>>>>- Did you use kyro serialization?
>>>>>>>- you should have spark.shuffle.compress=true, verify it.
>>>>>>>- How many tasks did you use? spark.default.parallelism=?
>>>>>>>- What about this:
>>>>>>>   - Read the data day by day
>>>>>>>   - compute a bucket id from timestamp, e.g., the date and hour
>>>>>>>   - Write into different buckets (you probably need a special
>>>>>>>   writer to write data e

Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Ted Yu
bq. in an noSQL db such as Hbase

+1 :-)


On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:

> One option you may want to explore is writing event table in an noSQL db
> such as Hbase. One inherent problem in your approach is you always need to
> load either full data set or a defined number of partitions to see if the
> event has already come (and no gurantee it is full proof, but lead to
> unnecessary loading in most cases).
>
> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>
>> Hey,
>> Thank you for the answer. I checked the setting you mentioend they are
>> all correct.  I noticed that in the job, there are always only 200 reducers
>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>
>> In the doc, it mentions:
>>
>>- Automatically determine the number of reducers for joins and
>>groupbys: Currently in Spark SQL, you need to control the degree of
>>parallelism post-shuffle using “SET
>>spark.sql.shuffle.partitions=[num_tasks];”.
>>
>>
>> What would be the ideal number for this setting? Is it based on the
>> hardware of cluster?
>>
>>
>> Thanks,
>>
>> Gavin
>>
>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com> wrote:
>>
>>>
>>>- I assume your parquet files are compressed. Gzip or Snappy?
>>>- What spark version did you use? It seems at least 1.4. If you use
>>>spark-sql and tungsten, you might have better performance. but spark 
>>> 1.5.2
>>>gave me a wrong result when the data was about 300~400GB, just for a 
>>> simple
>>>group-by and aggregate.
>>>- Did you use kyro serialization?
>>>- you should have spark.shuffle.compress=true, verify it.
>>>- How many tasks did you use? spark.default.parallelism=?
>>>- What about this:
>>>   - Read the data day by day
>>>   - compute a bucket id from timestamp, e.g., the date and hour
>>>   - Write into different buckets (you probably need a special
>>>   writer to write data efficiently without shuffling the data).
>>>   - distinct for each bucket. Because each bucket is small, spark
>>>   can get it done faster than having everything in one run.
>>>   - I think using groupBy (userId, timestamp) might be better than
>>>   distinct. I guess distinct() will compare every field.
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com>
>>> wrote:
>>>
>>>> And the most frequent operation I am gonna do is find the UserID who
>>>> have some events, then retrieve all the events associted with the UserID.
>>>>
>>>> In this case, how should I partition to speed up the process?
>>>>
>>>> Thanks.
>>>>
>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>> wrote:
>>>>
>>>>> hey Ted,
>>>>>
>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>>> the partition.
>>>>>
>>>>> Annoyingly, every day's incoming Event data having duplicates among
>>>>> each other.  One same event could show up in Day1 and Day2 and probably
>>>>> Day3.
>>>>>
>>>>> I only want to keep single Event table and each day it come so many
>>>>> duplicates.
>>>>>
>>>>> Is there a way I could just insert into Parquet and if duplicate
>>>>> found, just ignore?
>>>>>
>>>>> Thanks,
>>>>> Gavin
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> Is your Parquet data source partitioned by date ?
>>>>>>
>>>>>> Can you dedup within partitions ?
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I tried on Three day's data.  The total input is only 980GB, but the
>>>>>>> shuffle write Data is about 6.2TB, then the job failed during shuffle 
>>>>>>> read
>>>>>>> step, which should be another 6.2TB shuffle read.
>>>>>>>
>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there
>>>>>>> anything I could do to stablize this process?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey,
>>>>>>>>
>>>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>>>
>>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>>
>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>>> parquet file").
>>>>>>>>
>>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>>
>>>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>>>> executor. I guess this is due to the memory issue.
>>>>>>>>
>>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Gavin
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Ted Yu
Cycling old bits:
http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ

Gavin:
Which release of hbase did you play with ?

HBase has been evolving and is getting more stable.

Cheers

On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> I used to maintain a HBase cluster. The experience with it was not happy.
>
> I just tried query the data  from each day's first and dedup with smaller
> set, the performance is acceptable.  So I guess I will use this method.
>
> Again, could anyone give advice about:
>
>- Automatically determine the number of reducers for joins and
>groupbys: Currently in Spark SQL, you need to control the degree of
>parallelism post-shuffle using “SET
>spark.sql.shuffle.partitions=[num_tasks];”.
>
> Thanks.
>
> Gavin
>
>
>
>
> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> bq. in an noSQL db such as Hbase
>>
>> +1 :-)
>>
>>
>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> One option you may want to explore is writing event table in an noSQL db
>>> such as Hbase. One inherent problem in your approach is you always need to
>>> load either full data set or a defined number of partitions to see if the
>>> event has already come (and no gurantee it is full proof, but lead to
>>> unnecessary loading in most cases).
>>>
>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com>
>>> wrote:
>>>
>>>> Hey,
>>>> Thank you for the answer. I checked the setting you mentioend they are
>>>> all correct.  I noticed that in the job, there are always only 200 reducers
>>>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>>>
>>>> In the doc, it mentions:
>>>>
>>>>- Automatically determine the number of reducers for joins and
>>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>>parallelism post-shuffle using “SET
>>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>>
>>>>
>>>> What would be the ideal number for this setting? Is it based on the
>>>> hardware of cluster?
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Gavin
>>>>
>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>>- I assume your parquet files are compressed. Gzip or Snappy?
>>>>>- What spark version did you use? It seems at least 1.4. If you
>>>>>use spark-sql and tungsten, you might have better performance. but 
>>>>> spark
>>>>>1.5.2 gave me a wrong result when the data was about 300~400GB, just 
>>>>> for a
>>>>>simple group-by and aggregate.
>>>>>- Did you use kyro serialization?
>>>>>- you should have spark.shuffle.compress=true, verify it.
>>>>>- How many tasks did you use? spark.default.parallelism=?
>>>>>- What about this:
>>>>>   - Read the data day by day
>>>>>   - compute a bucket id from timestamp, e.g., the date and hour
>>>>>   - Write into different buckets (you probably need a special
>>>>>   writer to write data efficiently without shuffling the data).
>>>>>   - distinct for each bucket. Because each bucket is small, spark
>>>>>   can get it done faster than having everything in one run.
>>>>>   - I think using groupBy (userId, timestamp) might be better
>>>>>   than distinct. I guess distinct() will compare every field.
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> And the most frequent operation I am gonna do is find the UserID who
>>>>>> have some events, then retrieve all the events associted with the UserID.
>>>>>>
>>>>>> In this case, how should I partition to speed up the process?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> hey Ted,
>>>>>>>
>>>>>>> Event table is like this: UserID, EventType, Ev

Re: write new data to mysql

2016-01-08 Thread Ted Yu
Which Spark release are you using ?

For case #2, was there any error / clue in the logs ?

Cheers

On Fri, Jan 8, 2016 at 7:36 AM, Yasemin Kaya  wrote:

> Hi,
>
> I want to write dataframe existing mysql table, but when i use
> *peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
> "track_on_alarm",false)*
>
> it says "Table track_on_alarm already exists."
>
> And when i *use peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE,
> "track_on_alarm",true)*
>
> i lost the existing data.
>
> How i can write new data to db?
>
> Best,
> yasemin
>
> --
> hiç ender hiç
>


Re: Kryo serializer Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException:

2016-01-08 Thread Ted Yu
bq. try adding scala.collection.mutable.WrappedArray

But the hint said registering scala.collection.mutable.WrappedArray$ofRef.class
, right ?

On Fri, Jan 8, 2016 at 8:52 AM, jiml  wrote:

> (point of post is to see if anyone has ideas about errors at end of post)
>
> In addition, the real way to test if it's working is to force
> serialization:
>
> In Java:
>
> Create array of all your classes:
> // for kyro serializer it wants to register all classes that need to be
> serialized
> Class[] kryoClassArray = new Class[]{DropResult.class,
> DropEvaluation.class,
> PrintHetSharing.class};
>
> in the builder for your SparkConf (or in conf/spark-defaults.sh)
> .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> //require registration of all classes with Kyro
> .set("spark.kryo.registrationRequired", "true")
> // don't forget to register ALL classes or will get error
> .registerKryoClasses(kryoClassArray);
>
> Then you will start to get neat errors like the one I am working on:
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due
> to stage failure: Failed to serialize task 0, not attempting to retry it.
> Exception during serialization: java.io.IOException:
> java.lang.IllegalArgumentException: Class is not registered:
> scala.collection.mutable.WrappedArray$ofRef
> Note: To register this class use:
> kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
>
> I did try adding scala.collection.mutable.WrappedArray to the Class array
> up
> top but no luck. Thanks
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/kryos-serializer-tp16454p25921.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Window Functions importing issue in Spark 1.4.0

2016-01-07 Thread Ted Yu
Please take a look at the following for sample on how rowNumber is used:
https://github.com/apache/spark/pull/9050

BTW 1.4.0 was an old release.

Please consider upgrading.

On Thu, Jan 7, 2016 at 3:04 AM, satish chandra j 
wrote:

> HI All,
> Currently using Spark 1.4.0 version, I have a requirement to add a column
> having Sequential Numbering to an existing DataFrame
> I understand Window Function "rowNumber" serves my purpose
> hence I have below import statements to include the same
>
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.functions.rowNumber
>
> But I am getting an error at the import statement itself such as
> "object expressions is not a member of package org.apache.spark.sql"
>
> "value rowNumber is not a member of object org.apache.spark.sql.functions"
>
> Could anybody throw some light if any to fix the issue
>
> Regards,
> Satish Chandra
>


Re: spark ui security

2016-01-07 Thread Ted Yu
According to https://spark.apache.org/docs/latest/security.html#web-ui ,
web UI is covered.

FYI

On Thu, Jan 7, 2016 at 6:35 AM, Kostiantyn Kudriavtsev <
kudryavtsev.konstan...@gmail.com> wrote:

> hi community,
>
> do I understand correctly that spark.ui.filters property sets up filters
> only for jobui interface? is it any way to protect spark web ui in the same
> *way?*
>


Re: spark ui security

2016-01-07 Thread Ted Yu
Without kerberos you don't have true security.

Cheers

On Thu, Jan 7, 2016 at 1:56 PM, Kostiantyn Kudriavtsev <
kudryavtsev.konstan...@gmail.com> wrote:

> can I do it without kerberos and hadoop?
> ideally using filters as for job UI
>
> On Jan 7, 2016, at 1:22 PM, Prem Sure <premsure...@gmail.com> wrote:
>
> you can refer more on https://searchcode.com/codesearch/view/97658783/
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SecurityManager.scala
>
> spark.authenticate = true
> spark.ui.acls.enable = true
> spark.ui.view.acls = user1,user2
> spark.ui.filters =
> org.apache.hadoop.security.authentication.server.AuthenticationFilter
>
> spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=kerberos,kerberos.principal=HTTP/mybox@MYDOMAIN
> ,kerberos.keytab=/some/keytab"
>
>
>
>
> On Thu, Jan 7, 2016 at 10:35 AM, Kostiantyn Kudriavtsev <
> kudryavtsev.konstan...@gmail.com> wrote:
>
>> I’m afraid I missed where this property must be specified? I added it to
>> spark-xxx.conf which is basically configurable per job, so I assume to
>> protect WebUI the different place must be used, isn’t it?
>>
>> On Jan 7, 2016, at 10:28 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>> According to https://spark.apache.org/docs/latest/security.html#web-ui ,
>> web UI is covered.
>>
>> FYI
>>
>> On Thu, Jan 7, 2016 at 6:35 AM, Kostiantyn Kudriavtsev <
>> kudryavtsev.konstan...@gmail.com> wrote:
>>
>>> hi community,
>>>
>>> do I understand correctly that spark.ui.filters property sets up
>>> filters only for jobui interface? is it any way to protect spark web ui in
>>> the same *way?*
>>>
>>
>>
>>
>
>


Re: problem building spark on centos

2016-01-06 Thread Ted Yu
w.r.t. the second error, have you read this ?
http://www.captaindebug.com/2013/03/mavens-non-resolvable-parent-pom-problem.html#.Vo1fFGSrSuo

On Wed, Jan 6, 2016 at 9:49 AM, Jade Liu <jade@nor1.com> wrote:

> I’m using 3.3.9. Thanks!
>
> Jade
>
> From: Ted Yu <yuzhih...@gmail.com>
> Date: Tuesday, January 5, 2016 at 4:57 PM
> To: Jade Liu <jade@nor1.com>
> Cc: "user@spark.apache.org" <user@spark.apache.org>
> Subject: Re: problem building spark on centos
>
> Which version of maven are you using ?
>
> It should be 3.3.3+
>
> On Tue, Jan 5, 2016 at 4:54 PM, Jade Liu <jade@nor1.com> wrote:
>
>> Hi, All:
>>
>> I’m trying to build spark 1.5.2 from source using maven with the
>> following command:
>>
>> ./make-distribution.sh --tgz -Phadoop-2.6 -Pyarn -Dhadoop.version=2.6.0
>> -Dscala-2.11 -Phive -Phive-thriftserver –DskipTests
>>
>>
>> I got the following error:I'
>>
>> + VERSION='[ERROR] [Help 2]
>> http://cwiki.apache.org/confluence/display/MAVEN/UnresolvableModelException
>> '
>>
>>
>> When I try:
>>
>> build/mvn -X -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean
>> package
>>
>>
>> I got the following error:
>>
>> [FATAL] Non-resolvable parent POM for
>> org.apache.spark:spark-parent_2.11:1.5.2: Could not transfer artifact
>> org.apache:apache:pom:14 from/to central (https://repo1.maven.org/maven2):
>> java.security.ProviderException: java.security.KeyException and
>> 'parent.relativePath' points at wrong local POM @ line 22, column 11
>>
>>
>> Does anyone know how to change the settings in maven to fix this?
>>
>>
>> Thanks in advance,
>>
>>
>> Jade
>>
>
>


Re: How to insert df in HBASE

2016-01-06 Thread Ted Yu
Cycling prior discussion:

http://search-hadoop.com/m/q3RTtX7POh17hqdj1

On Wed, Jan 6, 2016 at 3:07 AM, Sadaf  wrote:

> HI,
>
> I need to insert a Dataframe in to hbase using scala code.
> Can anyone guide me how to achieve this?
>
> Any help would be much appreciated.
> Thanks
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-df-in-HBASE-tp25891.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: What should be the ideal value(unit) for spark.memory.offheap.size

2016-01-06 Thread Ted Yu
Turns out that I should have specified -i to my former grep command :-)

Thanks Marcelo

But does this mean that specifying custom value for parameter
spark.memory.offheap.size
would not take effect ?

Cheers

On Wed, Jan 6, 2016 at 2:47 PM, Marcelo Vanzin <van...@cloudera.com> wrote:

> Try "git grep -i spark.memory.offheap.size"...
>
> On Wed, Jan 6, 2016 at 2:45 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> > Maybe I looked in the wrong files - I searched *.scala and *.java files
> (in
> > latest Spark 1.6.0 RC) for '.offheap.' but didn't find the config.
> >
> > Can someone enlighten me ?
> >
> > Thanks
> >
> > On Wed, Jan 6, 2016 at 2:35 PM, Jakob Odersky <joder...@gmail.com>
> wrote:
> >>
> >> Check the configuration guide for a description on units
> >> (
> http://spark.apache.org/docs/latest/configuration.html#spark-properties).
> >> In your case, 5GB would be specified as 5g.
> >>
> >> On 6 January 2016 at 10:29, unk1102 <umesh.ka...@gmail.com> wrote:
> >>>
> >>> Hi As part of Spark 1.6 release what should be ideal value or unit for
> >>> spark.memory.offheap.size I have set as 5000 I assume it will be 5GB is
> >>> it
> >>> correct? Please guide.
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>>
> http://apache-spark-user-list.1001560.n3.nabble.com/What-should-be-the-ideal-value-unit-for-spark-memory-offheap-size-tp25898.html
> >>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>
> >>> -
> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >>> For additional commands, e-mail: user-h...@spark.apache.org
> >>>
> >>
> >
>
>
>
> --
> Marcelo
>


Re: What should be the ideal value(unit) for spark.memory.offheap.size

2016-01-06 Thread Ted Yu
Maybe I looked in the wrong files - I searched *.scala and *.java files (in
latest Spark 1.6.0 RC) for '.offheap.' but didn't find the config.

Can someone enlighten me ?

Thanks

On Wed, Jan 6, 2016 at 2:35 PM, Jakob Odersky  wrote:

> Check the configuration guide for a description on units (
> http://spark.apache.org/docs/latest/configuration.html#spark-properties).
> In your case, 5GB would be specified as 5g.
>
> On 6 January 2016 at 10:29, unk1102  wrote:
>
>> Hi As part of Spark 1.6 release what should be ideal value or unit for
>> spark.memory.offheap.size I have set as 5000 I assume it will be 5GB is it
>> correct? Please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/What-should-be-the-ideal-value-unit-for-spark-memory-offheap-size-tp25898.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: org.apache.spark.storage.BlockNotFoundException in Spark1.5.2+Tachyon0.7.1

2016-01-06 Thread Ted Yu
Have you seen this thread ?

http://search-hadoop.com/m/q3RTtAiQta22XrCI

On Wed, Jan 6, 2016 at 8:41 PM, Jia Zou  wrote:

> Dear all,
>
> I am using Spark1.5.2 and Tachyon0.7.1 to run KMeans with
> inputRDD.persist(StorageLevel.OFF_HEAP()).
>
> I've set tired storage for Tachyon. It is all right when working set is
> smaller than available memory. However, when working set exceeds available
> memory, I keep getting errors like below:
>
> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 197.1 in stage
> 0.0 (TID 206) on executor 10.149.11.81: java.lang.RuntimeException
> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_197 not found
>
> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 191.1 in stage
> 0.0 (TID 207) on executor 10.149.11.81: java.lang.RuntimeException
> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_191 not found
>
> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 197.2 in stage
> 0.0 (TID 208) on executor 10.149.11.81: java.lang.RuntimeException
> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_197 not found
>
> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 191.2 in stage
> 0.0 (TID 209) on executor 10.149.11.81: java.lang.RuntimeException
> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_191 not found
>
> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 197.3 in stage
> 0.0 (TID 210) on executor 10.149.11.81: java.lang.RuntimeException
> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_197 not found
>
>
> Can any one give me some suggestions? Thanks a lot!
>
>
> Best Regards,
> Jia
>


Re: Spark Token Expired Exception

2016-01-06 Thread Ted Yu
Which Spark / hadoop release are you using ?

Thanks

On Wed, Jan 6, 2016 at 12:16 PM, Nikhil Gs 
wrote:

> Hello Team,
>
>
> Thank you for your time in advance.
>
>
> Below are the log lines of my spark job which is used for consuming the
> messages from Kafka Instance and its loading to Hbase. I have noticed the
> below Warn lines and later it resulted to errors. But I noticed that,
> exactly after 7 days the token is getting expired and its trying to renew
> the token but its not able to even after retrying it. Mine is a Kerberos
> cluster. Can you please look into it and guide me whats the issue.
>
>
> Your time and suggestions are very valuable.
>
>
>
> 15/12/29 11:33:50 INFO scheduler.JobScheduler: Finished job streaming job
> 145141043 ms.0 from job set of time 145141043 ms
>
> 15/12/29 11:33:50 INFO scheduler.JobScheduler: Starting job streaming job
> 145141043 ms.1 from job set of time 145141043 ms
>
> 15/12/29 11:33:50 INFO scheduler.JobScheduler: Finished job streaming job
> 145141043 ms.1 from job set of time 145141043 ms
>
> 15/12/29 11:33:50 INFO rdd.BlockRDD: Removing RDD 120956 from persistence
> list
>
> 15/12/29 11:33:50 INFO scheduler.JobScheduler: Total delay: 0.003 s for
> time 145141043 ms (execution: 0.000 s)
>
> 15/12/29 11:33:50 INFO storage.BlockManager: Removing RDD 120956
>
> 15/12/29 11:33:50 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[120956] at createStream at SparkStreamingEngine.java:40 of time
> 145141043 ms
>
> 15/12/29 11:33:50 INFO rdd.MapPartitionsRDD: Removing RDD 120957 from
> persistence list
>
> 15/12/29 11:33:50 INFO storage.BlockManager: Removing RDD 120957
>
> 15/12/29 11:33:50 INFO scheduler.ReceivedBlockTracker: Deleting batches
> ArrayBuffer(145141041 ms)
>
> 15/12/29 11:34:00 INFO scheduler.JobScheduler: Added jobs for time
> 145141044 ms
>
> 15/12/29 11:34:00 INFO scheduler.JobScheduler: Starting job streaming job
> 145141044 ms.0 from job set of time 145141044 ms
>
> 15/12/29 11:34:00 INFO scheduler.JobScheduler: Finished job streaming job
> 145141044 ms.0 from job set of time 145141044 ms
>
> 15/12/29 11:34:00 INFO scheduler.JobScheduler: Starting job streaming job
> 145141044 ms.1 from job set of time 145141044 ms
>
> 15/12/29 11:34:00 INFO scheduler.JobScheduler: Finished job streaming job
> 145141044 ms.1 from job set of time 145141044 ms
>
> 15/12/29 11:34:00 INFO rdd.BlockRDD: Removing RDD 120958 from persistence
> list
>
> 15/12/29 11:34:00 INFO scheduler.JobScheduler: Total delay: 0.003 s for
> time 145141044 ms (execution: 0.001 s)
>
> 15/12/29 11:34:00 INFO storage.BlockManager: Removing RDD 120958
>
> 15/12/29 11:34:00 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[120958] at createStream at SparkStreamingEngine.java:40 of time
> 145141044 ms
>
> 15/12/29 11:34:00 INFO rdd.MapPartitionsRDD: Removing RDD 120959 from
> persistence list
>
> 15/12/29 11:34:00 INFO storage.BlockManager: Removing RDD 120959
>
> 15/12/29 11:34:00 INFO scheduler.ReceivedBlockTracker: Deleting batches
> ArrayBuffer(145141042 ms)
>
> 15/12/29 11:34:10 INFO scheduler.JobScheduler: Added jobs for time
> 145141045 ms
>
> 15/12/29 11:34:10 INFO scheduler.JobScheduler: Starting job streaming job
> 145141045 ms.0 from job set of time 145141045 ms
>
> 15/12/29 11:34:10 INFO scheduler.JobScheduler: Finished job streaming job
> 145141045 ms.0 from job set of time 145141045 ms
>
> 15/12/29 11:34:10 INFO scheduler.JobScheduler: Starting job streaming job
> 145141045 ms.1 from job set of time 145141045 ms
>
> 15/12/29 11:34:10 INFO scheduler.JobScheduler: Finished job streaming job
> 145141045 ms.1 from job set of time 145141045 ms
>
> 15/12/29 11:34:10 INFO rdd.BlockRDD: Removing RDD 120960 from persistence
> list
>
> 15/12/29 11:34:10 INFO scheduler.JobScheduler: Total delay: 0.004 s for
> time 145141045 ms (execution: 0.001 s)
>
> 15/12/29 11:34:10 INFO storage.BlockManager: Removing RDD 120960
>
> 15/12/29 11:34:10 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[120960] at createStream at SparkStreamingEngine.java:40 of time
> 145141045 ms
>
> 15/12/29 11:34:10 INFO rdd.MapPartitionsRDD: Removing RDD 120961 from
> persistence list
>
> 15/12/29 11:34:10 INFO storage.BlockManager: Removing RDD 120961
>
> 15/12/29 11:34:10 INFO scheduler.ReceivedBlockTracker: Deleting batches
> ArrayBuffer(145141043 ms)
>
> 15/12/29 11:34:13 WARN security.UserGroupInformation:
> PriviledgedActionException as:s (auth:SIMPLE)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> token (HDFS_DELEGATION_TOKEN token 3104414 for s) is expired
>
> 15/12/29 11:34:13 *WARN ipc.Client: Exception encountered while
> connecting to the server* :
> 

Re: Is there a way to use parallelize function in sparkR spark version (1.6.0)

2016-01-05 Thread Ted Yu
Please take a look at the following for examples:

R/pkg/R/RDD.R
R/pkg/R/pairRDD.R

Cheers

On Tue, Jan 5, 2016 at 2:36 AM, Chandan Verma 
wrote:

>
> ===
> DISCLAIMER: The information contained in this message (including any
> attachments) is confidential and may be privileged. If you have received it
> by mistake please notify the sender by return e-mail and permanently delete
> this message and any attachments from your system. Any dissemination, use,
> review, distribution, printing or copying of this message in whole or in
> part is strictly prohibited. Please note that e-mails are susceptible to
> change. CitiusTech shall not be liable for the improper or incomplete
> transmission of the information contained in this communication nor for any
> delay in its receipt or damage to your system. CitiusTech does not
> guarantee that the integrity of this communication has been maintained or
> that this communication is free of viruses, interceptions or interferences.
> 
>
>


Re: [discuss] dropping Python 2.6 support

2016-01-05 Thread Ted Yu
+1

> On Jan 5, 2016, at 10:49 AM, Davies Liu  wrote:
> 
> +1
> 
> On Tue, Jan 5, 2016 at 5:45 AM, Nicholas Chammas
>  wrote:
>> +1
>> 
>> Red Hat supports Python 2.6 on REHL 5 until 2020, but otherwise yes, Python
>> 2.6 is ancient history and the core Python developers stopped supporting it
>> in 2013. REHL 5 is not a good enough reason to continue support for Python
>> 2.6 IMO.
>> 
>> We should aim to support Python 2.7 and Python 3.3+ (which I believe we
>> currently do).
>> 
>> Nick
>> 
>>> On Tue, Jan 5, 2016 at 8:01 AM Allen Zhang  wrote:
>>> 
>>> plus 1,
>>> 
>>> we are currently using python 2.7.2 in production environment.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2016-01-05 18:11:45,"Meethu Mathew"  写道:
>>> 
>>> +1
>>> We use Python 2.7
>>> 
>>> Regards,
>>> 
>>> Meethu Mathew
>>> 
 On Tue, Jan 5, 2016 at 12:47 PM, Reynold Xin  wrote:
 
 Does anybody here care about us dropping support for Python 2.6 in Spark
 2.0?
 
 Python 2.6 is ancient, and is pretty slow in many aspects (e.g. json
 parsing) when compared with Python 2.7. Some libraries that Spark depend on
 stopped supporting 2.6. We can still convince the library maintainers to
 support 2.6, but it will be extra work. I'm curious if anybody still uses
 Python 2.6 to run Spark.
 
 Thanks.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: aggregateByKey vs combineByKey

2016-01-05 Thread Ted Yu
Looking at PairRDDFunctions.scala :

  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner:
Partitioner)(seqOp: (U, V) => U,
  combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
...
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
  cleanedSeqOp, combOp, partitioner)

I think the two operations should be have similar performance.

Cheers

On Tue, Jan 5, 2016 at 1:13 PM, Marco Mistroni  wrote:

> Hi all
>  i have the following dataSet
> kv = [(2,Hi), (1,i), (2,am), (1,a), (4,test), (6,s tring)]
>
> It's a simple list of tuples containing (word_length, word)
>
> What i wanted to do was to group the result by key in order to have a
> result in the form
>
> [(word_length_1, [word1, word2, word3], word_length_2, [word4, word5,
> word6])
>
> so i browsed spark API and was able to get the result i wanted using two
> different
> functions
> .
>
> scala> kv.combineByKey(List(_), (x:List[String], y:String) => y :: x,
> (x:List[St
>
> ring], y:List[String]) => x ::: y).collect()
>
> res86: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi,
> am)), (4,L
> ist(test)), (6,List(string)))
>
> and
>
> scala>
>
> scala> kv.aggregateByKey(List[String]())((acc, item) => item :: acc,
>
>  |(acc1, acc2) => acc1 ::: acc2).collect()
>
>
>
>
>
>
>
> res87: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi,
> am)), (4,L
> ist(test)), (6,List(string)))
>
> Now, question is: any advantages of using one instead of the others?
> Am i somehow misusing the API for what i want to do?
>
> kind regards
>  marco
>
>
>
>
>
>
>
>


Re: Negative Number of Active Tasks in Spark UI

2016-01-05 Thread Ted Yu
Which version of Spark do you use ?

This might be related:
https://issues.apache.org/jira/browse/SPARK-8560

Do you use dynamic allocation ?

Cheers

> On Jan 4, 2016, at 10:05 PM, Prasad Ravilla  wrote:
> 
> I am seeing negative active tasks in the Spark UI.
> 
> Is anyone seeing this?
> How is this possible?
> 
> Thanks,
> Prasad.
> 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: problem building spark on centos

2016-01-05 Thread Ted Yu
Which version of maven are you using ?

It should be 3.3.3+

On Tue, Jan 5, 2016 at 4:54 PM, Jade Liu  wrote:

> Hi, All:
>
> I’m trying to build spark 1.5.2 from source using maven with the following
> command:
>
> ./make-distribution.sh --tgz -Phadoop-2.6 -Pyarn -Dhadoop.version=2.6.0
> -Dscala-2.11 -Phive -Phive-thriftserver –DskipTests
>
>
> I got the following error:
>
> + VERSION='[ERROR] [Help 2]
> http://cwiki.apache.org/confluence/display/MAVEN/UnresolvableModelException
> '
>
>
> When I try:
>
> build/mvn -X -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean
> package
>
>
> I got the following error:
>
> [FATAL] Non-resolvable parent POM for
> org.apache.spark:spark-parent_2.11:1.5.2: Could not transfer artifact
> org.apache:apache:pom:14 from/to central (https://repo1.maven.org/maven2):
> java.security.ProviderException: java.security.KeyException and
> 'parent.relativePath' points at wrong local POM @ line 22, column 11
>
>
> Does anyone know how to change the settings in maven to fix this?
>
>
> Thanks in advance,
>
>
> Jade
>


Re: How to concat few rows into a new column in dataframe

2016-01-05 Thread Ted Yu
Something like the following:

val zeroValue = collection.mutable.Set[String]()

val aggredated = data.aggregateByKey (zeroValue)((set, v) => set += v,
(setOne, setTwo) => setOne ++= setTwo)

On Tue, Jan 5, 2016 at 2:46 PM, Gavin Yue  wrote:

> Hey,
>
> For example, a table df with two columns
> id  name
> 1   abc
> 1   bdf
> 2   ab
> 2   cd
>
> I want to group by the id and concat the string into array of string. like
> this
>
> id
> 1 [abc,bdf]
> 2 [ab, cd]
>
> How could I achieve this in dataframe?  I stuck on df.groupBy("id"). ???
>
> Thanks
>
>


Re: HiveThriftServer fails to quote strings

2016-01-04 Thread Ted Yu
bq. without any of the escape characters:

Did you intend to show some sample ?

As far as I can tell, there was no sample or image in previous email.

FYI

On Mon, Jan 4, 2016 at 11:36 AM, sclyon  wrote:

> Hello all,
>
> I've got a nested JSON structure in parquet format that I'm having some
> issues with when trying to query it through Hive.
>
> In Spark (1.5.2) the column is represented correctly:
>
>
> However, when queried from Hive I get the same column but without any of
> the
> escape characters:
>
>
> Naturally this breaks my JSON parsers and I'm unable to use the data. Has
> anyone encountered this error before? I tried looking through the source
> but
> all I can find that I think is related is the HiveContext.toHiveString
> method.
>
> Any advice would be appreciated!
>
> Thanks,
> Scott
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/HiveThriftServer-fails-to-quote-strings-tp25877.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Is Spark 1.6 released?

2016-01-04 Thread Ted Yu
Please refer to the following:

https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets
https://spark.apache.org/docs/latest/sql-programming-guide.html#creating-datasets
https://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

Cheers

On Mon, Jan 4, 2016 at 11:59 AM,  wrote:

> Where can I read more about the dataset api on a user layer? I am failing
> to get an API doc or understand when to use DataFrame or DataSet,
> advantages, etc.
>
> Thanks,
> Saif
>
> -Original Message-
> From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
> Sent: Monday, January 04, 2016 2:01 PM
> To: user@spark.apache.org
> Subject: Re: Is Spark 1.6 released?
>
> It's now OK: Michael published and announced the release.
>
> Sorry for the delay.
>
> Regards
> JB
>
> On 01/04/2016 10:06 AM, Jung wrote:
> > Hi
> > There were Spark 1.6 jars in maven central and github.
> > I found it 5 days ago. But it doesn't appear on Spark website now.
> > May I regard Spark 1.6 zip file in github as a stable release?
> >
> > Thanks
> > Jung
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Monitor Job on Yarn

2016-01-04 Thread Ted Yu
Please look at history server related content under:
https://spark.apache.org/docs/latest/running-on-yarn.html

Note spark.yarn.historyServer.address
FYI

On Mon, Jan 4, 2016 at 2:49 PM, Daniel Valdivia 
wrote:

> Hello everyone, happy new year,
>
> I submitted an app to yarn, however I'm unable to monitor it's progress on
> the driver node, not in :8080 or :4040 as
> documented, when submitting to the standalone mode I could monitor however
> seems liek its not the case right now.
>
> I submitted my app this way:
>
> spark-submit --class my.class --master yarn --deploy-mode cluster myjar.jar
>
> and so far the job is on it's way it seems, the console is vivid with
> Application report messages, however I can't access the status of the app,
> should I have submitted the app in a different fashion to access the status
> of it?
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: groupByKey does not work?

2016-01-04 Thread Ted Yu
Can you give a bit more information ?

Release of Spark you're using
Minimal dataset that shows the problem

Cheers

On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra  wrote:

> I tried groupByKey and noticed that it did not group all values into the
> same group.
>
> In my test dataset (a Pair rdd) I have 16 records, where there are only 4
> distinct keys, so I expected there to be 4 records in the groupByKey
> object, but instead there were 8. Each of the 4 distinct keys appear 2
> times.
>
> Is this the expected behavior? I need to be able to get ALL values
> associated with each key grouped into a SINGLE record. Is it possible?
>
> Arun
>
> p.s. reducebykey will not be sufficient for me
>


Re: Apparent bug in KryoSerializer

2015-12-31 Thread Ted Yu
For your second question,

bq. Class is not registered: scala.Tuple3[]

The above IllegalArgumentException has stated the class Scala was expecting
registration.
Meaning the type of components in the tuple is insignificant.

BTW what Spark release are you using ?

Cheers

On Thu, Dec 31, 2015 at 9:49 AM, Russ  wrote:

> The ScalaTest code that is enclosed at the end of this email message
> demonstrates what appears to be a bug in the KryoSerializer.  This code was
> executed from IntelliJ IDEA (community edition) under Mac OS X 10.11.2
>
> The KryoSerializer is enabled by updating the original SparkContext  (that
> is supplied by the ScalaTest) via:
>
> 1. reading the SparkConf from the SparkContext,
> 2. updating the SparkConf to enable the KryoSerializer,
> 3. stopping the original SparkContext, and
> 4. creating a new SparkContext from the updated SparkConf.
>
> Following enabling of the KryoSerializer, execution of the following line
> (line 56):
>
> val rddPartitionsSizes: Array[Int] = rdd.mapPartitions(iter => 
> Array(iter.size).iterator, true).collect
>
> threw the following three instances of IllegalArgumentException:
>
> java.lang.IllegalArgumentException: Class is not registered:
> scala.collection.mutable.WrappedArray$ofInt
> java.lang.IllegalArgumentException: Class is not registered: int[]
> java.lang.IllegalArgumentException: Class is not registered: scala.Tuple3[]
>
> which prompted registration of the following three classes with the
> KryoSerializer via the SparkConf.registerKryoClasses() method:
>
> classOf[scala.collection.mutable.WrappedArray.ofInt],
> classOf[Array[Int]],
> classOf[Array[Tuple3[_, _, _]]]
>
> Following registration of these three classes with the KryoSerializer, the
> above-indicated 'val rddPartitionsSizes...' line (line 56) executed without
> throwing an IllegalArgumentException.
>
> However, execution of the following line (line 59):
>
> val sortedRddPartitionsSizes: Array[Int] = sortedRdd.mapPartitions(iter => 
> Array(iter.size).iterator, true).collect
>
> threw the following SparkException:
>
> Task not serializable
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1847)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:904)
> at
> kryo.KryoSerializerTest$$anonfun$1.apply$mcV$sp(KryoSerializerTest.scala:59)
> at
> kryo.KryoSerializerTest$$anonfun$1.apply(KryoSerializerTest.scala:39)
> at
> kryo.KryoSerializerTest$$anonfun$1.apply(KryoSerializerTest.scala:39)
> at
> org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
> at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> at org.scalatest.Transformer.apply(Transformer.scala:22)
> at org.scalatest.Transformer.apply(Transformer.scala:20)
> at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
> at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
> at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
> at
> org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
> at
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
> at
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
> at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
> at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
> at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
> at
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
> at
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
> at
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
> at
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
> at org.scalatest.SuperEngine.org
> $scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
> at 

Re: pass custom spark-conf

2015-12-31 Thread Ted Yu
Check out --conf option for spark-submit

bq. to configure different hdfs-site.xml

What config parameters do you plan to change in hdfs-site.xml ?
If the parameter only affects hdfs NN / DN, passing hdfs-site.xml wouldn't
take effect, right ?

Cheers

On Thu, Dec 31, 2015 at 10:48 AM, KOSTIANTYN Kudriavtsev <
kudryavtsev.konstan...@gmail.com> wrote:

> Hi all,
>
> I'm trying to use different spark-default.conf per user, i.e. I want to
> have spark-user1.conf and etc. Is it a way to pass a path to appropriate
> conf file when I'm using standalone spark installation?
> Also, is it possible to configure different hdfs-site.xml and pass it as
> well with spark-submit?
>
> thank you in advance
>
> Happy holidays!
>
>
> /Kostia
>


Re: difference between ++ and Union of a RDD

2015-12-29 Thread Ted Yu
>From RDD.scala :

  def ++(other: RDD[T]): RDD[T] = withScope {
this.union(other)

They should be the same.

On Tue, Dec 29, 2015 at 10:41 AM, email2...@gmail.com 
wrote:

> Hello All -
>
> tried couple of operations by using ++ and union on RDD's but realized that
> the end results are same. Do you know any differences?.
>
> val odd_partA  = List(1,3,5,7,9,11,1,3,5,7,9,11,1,3,5,7,9,11)
> odd_partA: List[Int] = List(1, 3, 5, 7, 9, 11, 1, 3, 5, 7, 9, 11, 1, 3, 5,
> 7, 9, 11)
>
> val odd_partB  = List(1,3,13,15,9)
> odd_partB: List[Int] = List(1, 3, 13, 15, 9)
>
> val odd_partC  = List(15,9,1,3,13)
> odd_partC: List[Int] = List(15, 9, 1, 3, 13)
>
> val odd_partA_RDD = sc.parallelize(odd_partA)
> odd_partA_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at
> parallelize at :17
>
> val odd_partB_RDD = sc.parallelize(odd_partB)
> odd_partB_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at
> parallelize at :17
>
> val odd_partC_RDD = sc.parallelize(odd_partC)
> odd_partC_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at
> parallelize at :17
>
> val odd_PARTAB_pp = odd_partA_RDD ++(odd_partB_RDD)
> odd_PARTAB_pp: org.apache.spark.rdd.RDD[Int] = UnionRDD[12] at $plus$plus
> at
> :23
>
> val odd_PARTAB_union = odd_partA_RDD.union(odd_partB_RDD)
> odd_PARTAB_union: org.apache.spark.rdd.RDD[Int] = UnionRDD[13] at union at
> :23
>
> odd_PARTAB_pp.count
> res8: Long = 23
>
> odd_PARTAB_union.count
> res9: Long = 23
>
> val odd_PARTABC_pp = odd_partA_RDD ++(odd_partB_RDD) ++ (odd_partC_RDD)
> odd_PARTABC_pp: org.apache.spark.rdd.RDD[Int] = UnionRDD[15] at $plus$plus
> at :27
>
> val odd_PARTABC_union =
> odd_partA_RDD.union(odd_partB_RDD).union(odd_partC_RDD)
> odd_PARTABC_union: org.apache.spark.rdd.RDD[Int] = UnionRDD[17] at union at
> :27
>
> odd_PARTABC_pp.count
> res10: Long = 28
>
> odd_PARTABC_union.count
> res11: Long = 28
>
> Thanks
> Gokul
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/difference-between-and-Union-of-a-RDD-tp25830.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Task hang problem

2015-12-29 Thread Ted Yu
Can you log onto 10.65.143.174 , find task 31 and take a stack trace ?

Thanks

On Tue, Dec 29, 2015 at 9:19 AM, Darren Govoni  wrote:

> Hi,
>   I've had this nagging problem where a task will hang and the entire job
> hangs. Using pyspark. Spark 1.5.1
>
> The job output looks like this, and hangs after the last task:
>
> ..
> 15/12/29 17:00:38 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> memory on 10.65.143.174:34385 (size: 5.8 KB, free: 2.1 GB)
> 15/12/29 17:00:39 INFO TaskSetManager: Finished task 15.0 in stage 0.0
> (TID 15) in 11668 ms on 10.65.143.174 (29/32)
> 15/12/29 17:00:39 INFO TaskSetManager: Finished task 23.0 in stage 0.0
> (TID 23) in 11684 ms on 10.65.143.174 (30/32)
> 15/12/29 17:00:39 INFO TaskSetManager: Finished task 7.0 in stage 0.0
> (TID 7) in 11717 ms on 10.65.143.174 (31/32)
> {nothing here for a while, ~6mins}
>
>
> Here is the executor status, from UI.
>
> 31 31 0 RUNNING PROCESS_LOCAL 2 / 10.65.143.174 2015/12/29 17:00:28 6.8
> min 0 ms 0 ms 60 ms 0 ms 0 ms 0.0 B
> Here is executor 2 from 10.65.143.174. Never see task 31 get to the
> executor.any ideas?
>
> .
> 15/12/29 17:00:38 INFO TorrentBroadcast: Started reading broadcast
> variable 0
> 15/12/29 17:00:38 INFO MemoryStore: ensureFreeSpace(5979) called with
> curMem=0, maxMem=2223023063
> 15/12/29 17:00:38 INFO MemoryStore: Block broadcast_0_piece0 stored as
> bytes in memory (estimated size 5.8 KB, free 2.1 GB)
> 15/12/29 17:00:38 INFO TorrentBroadcast: Reading broadcast variable 0
> took 208 ms
> 15/12/29 17:00:38 INFO MemoryStore: ensureFreeSpace(8544) called with
> curMem=5979, maxMem=2223023063
> 15/12/29 17:00:38 INFO MemoryStore: Block broadcast_0 stored as values in
> memory (estimated size 8.3 KB, free 2.1 GB)
> 15/12/29 17:00:39 INFO PythonRunner: Times: total = 913, boot = 747, init
> = 166, finish = 0
> 15/12/29 17:00:39 INFO Executor: Finished task 15.0 in stage 0.0 (TID
> 15). 967 bytes result sent to driver
> 15/12/29 17:00:39 INFO PythonRunner: Times: total = 955, boot = 735, init
> = 220, finish = 0
> 15/12/29 17:00:39 INFO Executor: Finished task 23.0 in stage 0.0 (TID
> 23). 967 bytes result sent to driver
> 15/12/29 17:00:39 INFO PythonRunner: Times: total = 970, boot = 812, init
> = 158, finish = 0
> 15/12/29 17:00:39 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7).
> 967 bytes result sent to driver
> root@ip-10-65-143-174 2]$
>
>
>
> Sent from my Verizon Wireless 4G LTE smartphone
>


Re: Executor deregistered after 2mins (mesos, 1.6.0-rc4)

2015-12-29 Thread Ted Yu
Have you searched log for 'f02cb67a-3519-4655-b23a-edc0dd082bf1-S1/4' ?

In the snippet you posted, I don't see registration of this Executor.

Cheers

On Tue, Dec 29, 2015 at 12:43 PM, Adrian Bridgett 
wrote:

> We're seeing an "Executor is not registered" error on a Spark (1.6.0rc4,
> mesos-0.26) cluster.  It seems as if the logic in
> MesosExternalShuffleService.scala isn't working for some reason (new in 1.6
> I believe).
>
> spark application sees this:
> ...
> 15/12/29 18:49:41 INFO MesosExternalShuffleClient: Successfully registered
> app a9344e17-f767-4b1e-a32e-e98922d6ca43-0014 with external shuffle service.
> 15/12/29 18:49:41 INFO MesosExternalShuffleClient: Successfully registered
> app a9344e17-f767-4b1e-a32e-e98922d6ca43-0014 with external shuffle service.
> 15/12/29 18:49:43 INFO CoarseMesosSchedulerBackend: Registered executor
> NettyRpcEndpointRef(null) (ip-10-1-201-165.ec2.internal:37660) with ID
> f02cb67a-3519-4655-b23a-edc0dd082bf1-S9/6
> 15/12/29 18:49:43 INFO ExecutorAllocationManager: New executor
> f02cb67a-3519-4655-b23a-edc0dd082bf1-S3/1 has registered (new total is 1)
> 15/12/29 18:49:43 INFO BlockManagerMasterEndpoint: Registering block
> manager ip-10-1-201-165.ec2.internal:53854 with 13.8 GB RAM,
> BlockManagerId(f02cb67a-3519-4655-b23a-edc0dd082bf1-S9/6,
> ip-10-1-201-165.ec2.internal, 53854)
> 15/12/29 18:49:43 INFO BlockManagerMasterEndpoint: Registering block
> manager ip-10-1-201-132.ec2.internal:12793 with 13.8 GB RAM,
> BlockManagerId(f02cb67a-3519-4655-b23a-edc0dd082bf1-S3/1,
> ip-10-1-201-132.ec2.internal, 12793)
> 15/12/29 18:49:43 INFO ExecutorAllocationManager: New executor
> f02cb67a-3519-4655-b23a-edc0dd082bf1-S9/6 has registered (new total is 2)
> ...
> 15/12/29 18:58:06 INFO BlockManagerInfo: Added broadcast_6_piece0 in
> memory on ip-10-1-201-165.ec2.internal:53854 (size: 5.2KB, free: 13.8GB)
> 15/12/29 18:58:06 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> output locations for shuffle 1 to ip-10-1-202-121.ec2.internal:59734
> 15/12/29 18:58:06 INFO MapOutputTrackerMaster: Size of output statuses for
> shuffle 1 is 1671814 bytes
> 15/12/29 18:58:06 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> output locations for shuffle 1 to ip-10-1-201-165.ec2.internal:37660
> ...
> 15/12/29 18:58:07 INFO TaskSetManager: Starting task 63.0 in stage 1.0
> (TID 2191, ip-10-1-200-232.ec2.internal, partition 63,PROCESS_LOCAL, 2171
> bytes)
> 15/12/29 18:58:07 WARN TaskSetManager: Lost task 21.0 in stage 1.0 (TID
> 2149, ip-10-1-200-232.ec2.internal):
> FetchFailed(BlockManagerId(f02cb67a-3519-4655-b23a-edc0dd082bf1-S1/4,
> ip-10-1-202-114.ec2.internal, 7337), shuffleId=1, mapId=5, reduceId=21,
> message=
> org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException:
> Executor is not registered
> (appId=a9344e17-f767-4b1e-a32e-e98922d6ca43-0014,
> execId=f02cb67a-3519-4655-b23a-edc0dd082bf1-S1/4)
> at
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:183)
> ...
> 15/12/29 18:58:07 INFO DAGScheduler: Resubmitting ShuffleMapStage 0
> (reduceByKey at
> /home/ubuntu/ajay/name-mapper/kpis/namemap_kpi_processor.py:48) and
> ShuffleMapStage 1 (reduceByKey at
> /home/ubuntu/ajay/name-mapper/kpis/namemap_kpi_processor.py:50) due to
> fetch failure
> 15/12/29 18:58:07 WARN TaskSetManager: Lost task 12.0 in stage 1.0 (TID
> 2140, ip-10-1-200-232.ec2.internal):
> FetchFailed(BlockManagerId(f02cb67a-3519-4655-b23a-edc0dd082bf1-S9/6,
> ip-10-1-201-165.ec2.internal, 7337), shuffleId=1, mapId=6, reduceId=12,
> message=
> org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException:
> Executor is not registered
> (appId=a9344e17-f767-4b1e-a32e-e98922d6ca43-0014,
> execId=f02cb67a-3519-4655-b23a-edc0dd082bf1-S9/6)
> at
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:183)
>
>
> shuffle service itself (on executor's IP sees:
> 15/12/29 18:49:41 INFO MesosExternalShuffleBlockHandler: Received
> registration request from app a9344e17-f767-4b1e-a32e-e98922d6ca43-0014
> (remote address /10.1.200.165:37889).  (that's the driver IP)
> 15/12/29 18:49:43 WARN MesosExternalShuffleBlockHandler: Unknown /
> 10.1.201.165:52562 disconnected. (executor IP)
> 15/12/29 18:51:41 INFO MesosExternalShuffleBlockHandler: Application
> a9344e17-f767-4b1e-a32e-e98922d6ca43-0014 disconnected (address was /
> 10.1.200.165:37889). (driver IP again)
> 15/12/29 18:58:07 ERROR TransportRequestHandler: Error while invoking
> RpcHandler#receive() on RPC id 6244044000322436908
> java.lang.RuntimeException: Executor is not registered
> (appId=a9344e17-f767-4b1e-a32e-e98922d6ca43-0014,
> execId=f02cb67a-3519-4655-b23a-edc0dd082bf1-S9/6) (executor IP)
> at
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:183)
>
> At first I wondered if reducing 

Re: difference between ++ and Union of a RDD

2015-12-29 Thread Ted Yu
bq. same case with sc.parallelize() or sc.makeRDD()

I think so.

On Tue, Dec 29, 2015 at 10:50 AM, Gokula Krishnan D <email2...@gmail.com>
wrote:

> Ted - Thanks for the updates. Then its the same case with sc.parallelize()
> or sc.makeRDD() right.
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>
> On Tue, Dec 29, 2015 at 1:43 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> From RDD.scala :
>>
>>   def ++(other: RDD[T]): RDD[T] = withScope {
>> this.union(other)
>>
>> They should be the same.
>>
>> On Tue, Dec 29, 2015 at 10:41 AM, email2...@gmail.com <
>> email2...@gmail.com> wrote:
>>
>>> Hello All -
>>>
>>> tried couple of operations by using ++ and union on RDD's but realized
>>> that
>>> the end results are same. Do you know any differences?.
>>>
>>> val odd_partA  = List(1,3,5,7,9,11,1,3,5,7,9,11,1,3,5,7,9,11)
>>> odd_partA: List[Int] = List(1, 3, 5, 7, 9, 11, 1, 3, 5, 7, 9, 11, 1, 3,
>>> 5,
>>> 7, 9, 11)
>>>
>>> val odd_partB  = List(1,3,13,15,9)
>>> odd_partB: List[Int] = List(1, 3, 13, 15, 9)
>>>
>>> val odd_partC  = List(15,9,1,3,13)
>>> odd_partC: List[Int] = List(15, 9, 1, 3, 13)
>>>
>>> val odd_partA_RDD = sc.parallelize(odd_partA)
>>> odd_partA_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9]
>>> at
>>> parallelize at :17
>>>
>>> val odd_partB_RDD = sc.parallelize(odd_partB)
>>> odd_partB_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10]
>>> at
>>> parallelize at :17
>>>
>>> val odd_partC_RDD = sc.parallelize(odd_partC)
>>> odd_partC_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11]
>>> at
>>> parallelize at :17
>>>
>>> val odd_PARTAB_pp = odd_partA_RDD ++(odd_partB_RDD)
>>> odd_PARTAB_pp: org.apache.spark.rdd.RDD[Int] = UnionRDD[12] at
>>> $plus$plus at
>>> :23
>>>
>>> val odd_PARTAB_union = odd_partA_RDD.union(odd_partB_RDD)
>>> odd_PARTAB_union: org.apache.spark.rdd.RDD[Int] = UnionRDD[13] at union
>>> at
>>> :23
>>>
>>> odd_PARTAB_pp.count
>>> res8: Long = 23
>>>
>>> odd_PARTAB_union.count
>>> res9: Long = 23
>>>
>>> val odd_PARTABC_pp = odd_partA_RDD ++(odd_partB_RDD) ++ (odd_partC_RDD)
>>> odd_PARTABC_pp: org.apache.spark.rdd.RDD[Int] = UnionRDD[15] at
>>> $plus$plus
>>> at :27
>>>
>>> val odd_PARTABC_union =
>>> odd_partA_RDD.union(odd_partB_RDD).union(odd_partC_RDD)
>>> odd_PARTABC_union: org.apache.spark.rdd.RDD[Int] = UnionRDD[17] at union
>>> at
>>> :27
>>>
>>> odd_PARTABC_pp.count
>>> res10: Long = 28
>>>
>>> odd_PARTABC_union.count
>>> res11: Long = 28
>>>
>>> Thanks
>>> Gokul
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/difference-between-and-Union-of-a-RDD-tp25830.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Can't submit job to stand alone cluster

2015-12-28 Thread Ted Yu
Have you verified that the following file does exist ?

/home/hadoop/git/scalaspark/./target/scala-2.10/cluster-
incidents_2.10-1.0.jar

Thanks

On Mon, Dec 28, 2015 at 3:16 PM, Daniel Valdivia 
wrote:

> Hi,
>
> I'm trying to submit a job to a small spark cluster running in stand alone
> mode, however it seems like the jar file I'm submitting to the cluster is
> "not found" by the workers nodes.
>
> I might have understood wrong, but I though the Driver node would send
> this jar file to the worker nodes, or should I manually send this file to
> each worker node before I submit the job?
>
> what I'm doing:
>
>  $SPARK_HOME/bin/spark-submit --master spark://sslabnode01:6066
> --deploy-mode cluster  --class ClusterIncidents
> ./target/scala-2.10/cluster-incidents_2.10-1.0.jar
>
> The error I'm getting:
>
> Running Spark using the REST application submission protocol.
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 15/12/28 15:13:58 INFO RestSubmissionClient: Submitting a request to
> launch an application in spark://sslabnode01:6066.
> 15/12/28 15:13:59 INFO RestSubmissionClient: Submission successfully
> created as driver-20151228151359-0003. Polling submission state...
> 15/12/28 15:13:59 INFO RestSubmissionClient: Submitting a request for the
> status of submission driver-20151228151359-0003 in spark://sslabnode01:6066.
> 15/12/28 15:13:59 INFO RestSubmissionClient: State of driver
> driver-20151228151359-0003 is now ERROR.
> 15/12/28 15:13:59 INFO RestSubmissionClient: Driver is running on worker
> worker-20151218150246-10.15.235.241-52077 at 10.15.235.241:52077.
> 15/12/28 15:13:59 ERROR RestSubmissionClient: Exception from the cluster:
> java.io.FileNotFoundException:
> /home/hadoop/git/scalaspark/./target/scala-2.10/cluster-incidents_2.10-1.0.jar
> (No such file or directory)
> java.io.FileInputStream.open(Native Method)
> java.io.FileInputStream.(FileInputStream.java:146)
>
> org.spark-project.guava.io.Files$FileByteSource.openStream(Files.java:124)
>
> org.spark-project.guava.io.Files$FileByteSource.openStream(Files.java:114)
> org.spark-project.guava.io.ByteSource.copyTo(ByteSource.java:202)
> org.spark-project.guava.io.Files.copy(Files.java:436)
>
> org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$copyRecursive(Utils.scala:514)
> org.apache.spark.util.Utils$.copyFile(Utils.scala:485)
> org.apache.spark.util.Utils$.doFetchFile(Utils.scala:562)
> org.apache.spark.util.Utils$.fetchFile(Utils.scala:369)
> org.apache.spark.deploy.worker.DriverRunner.org
> $apache$spark$deploy$worker$DriverRunner$$downloadUserJar(DriverRunner.scala:150)
>
> org.apache.spark.deploy.worker.DriverRunner$$anon$1.run(DriverRunner.scala:79)
> 15/12/28 15:13:59 INFO RestSubmissionClient: Server responded with
> CreateSubmissionResponse:
> {
>   "action" : "CreateSubmissionResponse",
>   "message" : "Driver successfully submitted as
> driver-20151228151359-0003",
>   "serverSparkVersion" : "1.5.2",
>   "submissionId" : "driver-20151228151359-0003",
>   "success" : true
> }
>
> Thanks in advance
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Inconsistent behavior of randomSplit in YARN mode

2015-12-28 Thread Ted Yu
bq. the train and test have overlap in the numbers being outputted

Can the call to repartition explain the above ?

Which release of Spark are you using ?

Thanks

On Sun, Dec 27, 2015 at 9:56 PM, Gaurav Kumar 
wrote:

> Hi,
>
> I noticed an inconsistent behavior when using rdd.randomSplit when the
> source rdd is repartitioned, but only in YARN mode. It works fine in local
> mode though.
>
> *Code:*
> val rdd = sc.parallelize(1 to 100)
> val rdd2 = rdd.repartition(64)
> rdd.partitions.size
> rdd2.partitions.size
> val Array(train, test) = *rdd2*.randomSplit(Array(70, 30), 1)
> train.takeOrdered(10)
> test.takeOrdered(10)
>
> *Master: local*
> Both the take statements produce consistent results and have no overlap in
> numbers being outputted.
>
>
> *Master: YARN*However, when these are run on YARN mode, these produce
> random results every time and also the train and test have overlap in the
> numbers being outputted.
> If I use *rdd*.randomSplit, then it works fine even on YARN.
>
> So, it concludes that the repartition is being evaluated every time the
> splitting occurs.
>
> Interestingly, if I cache the rdd2 before splitting it, then we can expect
> consistent behavior since repartition is not evaluated again and again.
>
> Best Regards,
> Gaurav Kumar
> Big Data • Data Science • Photography • Music
> +91 9953294125
>


Re: Pattern type is incompatible with expected type

2015-12-27 Thread Ted Yu
Have you tried declaring RDD[ChildTypeOne] and writing separate functions
for each sub-type ?

Cheers

On Sun, Dec 27, 2015 at 10:08 AM, pkhamutou  wrote:

> Hello,
>
> I have a such situation:
>
> abstract class SuperType {...}
> case class ChildTypeOne(x: String) extends SuperType {.}
> case class ChildTypeTwo(x: String) extends SuperType {}
>
> than I have:
>
> val rdd1: RDD[SuperType] = sc./*some code*/.map(r => ChildTypeOne(r))
> val rdd2: RDD[SuperType] = sc./*some code*/.map(r => ChildTypeTwo(r))
>
> but when i try to:
> def someFunction(rdd: RDD[SuperType]) = rdd match {
>   case rdd: RDD[ChildTypeOne] => println("ChildTypeOne")
>   case rdd: RDD[ChildTypeTwo] => println("ChildTypeTwo")
> }
>
>
> i get:
>
> Error:(60, 15) pattern type is incompatible with expected type;
>  found   : org.apache.spark.rdd.RDD[ChildTypeOne]
>  required: org.apache.spark.rdd.RDD[SuperType]
> Note: ChildTypeOne <: SuperType, but class RDD is invariant in type T.
> You may wish to define T as +T instead. (SLS 4.5)
>   case rdd: RDD[ChildTypeOne] => println("ChildTypeOne")
>   ^
>
> So how to work around it? Because in some situations I need to distinguish
> them.
>
> Best regards,
> Pavel Khamutou
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Pattern-type-is-incompatible-with-expected-type-tp25805.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: partitioning json data in spark

2015-12-27 Thread Ted Yu
Is upgrading to 1.5.x a possibility for you ?

Cheers

On Sun, Dec 27, 2015 at 9:28 AM, Նարեկ Գալստեան 
wrote:

>
> http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
>  I did try but it all was in vain.
> It is also explicitly written in api docs that it only supports Parquet.
>
> ​
>
> Narek Galstyan
>
> Նարեկ Գալստյան
>
> On 27 December 2015 at 17:52, Igor Berman  wrote:
>
>> have you tried to specify format of your output, might be parquet is
>> default format?
>> df.write().format("json").mode(SaveMode.Overwrite).save("/tmp/path");
>>
>> On 27 December 2015 at 15:18, Նարեկ Գալստեան 
>> wrote:
>>
>>> Hey all!
>>> I am willing to partition *json *data by a column name and store the
>>> result as a collection of json files to be loaded to another database.
>>>
>>> I could use spark's built in *partitonBy *function but it only outputs
>>> in parquet format which is not desirable for me.
>>>
>>> Could you suggest me a way to deal with this problem?
>>> Narek Galstyan
>>>
>>> Նարեկ Գալստյան
>>>
>>
>>
>


Re: DataFrame Save is writing just column names while saving

2015-12-27 Thread Ted Yu
Can you confirm that file1df("COLUMN2") and file2df("COLUMN10") appeared in
the output of joineddf.collect.foreach(println)
 ?

Thanks

On Sun, Dec 27, 2015 at 6:32 PM, Divya Gehlot 
wrote:

> Hi,
> I am trying to join two dataframes and able to display the results in the
> console ater join. I am saving that data and and saving in the joined data
> in CSV format using spark-csv api . Its just saving the column names not
> data at all.
>
> Below is the sample code for the reference:
>
> spark-shell   --packages com.databricks:spark-csv_2.10:1.1.0  --master
>> yarn-client --driver-memory 512m --executor-memory 512m
>>
>> import org.apache.spark.sql.hive.HiveContext
>> import org.apache.spark.sql.hive.orc._
>> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>> IntegerType,FloatType ,LongType ,TimestampType };
>>
>> val firstSchema = StructType(Seq(StructField("COLUMN1", StringType,
>> true),StructField("COLUMN2", StringType, true),StructField("COLUMN2",
>> StringType, true),StructField("COLUMN3", StringType, true)
>> StructField("COLUMN4", StringType, true),StructField("COLUMN5",
>> StringType, true)))
>> val file1df =
>> hiveContext.read.format("com.databricks.spark.csv").option("header",
>> "true").schema(firstSchema).load("/tmp/File1.csv")
>>
>>
>> val secondSchema = StructType(Seq(
>> StructField("COLUMN1", StringType, true),
>> StructField("COLUMN2", NullType  , true),
>> StructField("COLUMN3", TimestampType , true),
>> StructField("COLUMN4", TimestampType , true),
>> StructField("COLUMN5", NullType , true),
>> StructField("COLUMN6", StringType, true),
>> StructField("COLUMN7", IntegerType, true),
>> StructField("COLUMN8", IntegerType, true),
>> StructField("COLUMN9", StringType, true),
>> StructField("COLUMN10", IntegerType, true),
>> StructField("COLUMN11", IntegerType, true),
>> StructField("COLUMN12", IntegerType, true)))
>>
>>
>> val file2df =
>> hiveContext.read.format("com.databricks.spark.csv").option("header",
>> "false").schema(secondSchema).load("/tmp/file2.csv")
>> val joineddf = file1df.join(file2df, file1df("COLUMN1") ===
>> file2df("COLUMN6"))
>> val selecteddata = joineddf.select(file1df("COLUMN2"),file2df("COLUMN10"))
>>
> //the below statement is printing the joined data
>
>> joineddf.collect.foreach(println)
>>
>
>
>> //this statement saves the CSVfile but only columns names mentioned above
>> on the select are being saved
>> selecteddata.write.format("com.databricks.spark.csv").option("header",
>> "true").save("/tmp/JoinedData.csv")
>>
>
>
> Would really appreciate the pointers /help.
>
> Thanks,
> Divya
>
>
>
>
>


Re: ERROR server.TThreadPoolServer: Error occurred during processing of message

2015-12-26 Thread Ted Yu
Have you seen this ?

http://stackoverflow.com/questions/30705576/python-cannot-connect-hiveserver2

On Sat, Dec 26, 2015 at 9:09 PM, Dasun Hegoda  wrote:

> I'm running apache-hive-1.2.1-bin and spark-1.5.1-bin-hadoop2.6. spark as
> the hive engine. When I try to connect through JasperStudio using thrift
> port I get below error. I'm running ubuntu 14.04.
>
> 15/12/26 23:36:20 ERROR server.TThreadPoolServer: Error occurred
> during processing of message.
> java.lang.RuntimeException:
> org.apache.thrift.transport.TSaslTransportException: No data or no sasl
> data in the stream
> at
> org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)
> at
> org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:268)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.thrift.transport.TSaslTransportException: No
> data or no sasl data in the stream
> at
> org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:328)
> at
> org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41)
> at
> org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)
> ... 4 more
> 15/12/26 23:36:20 INFO thrift.ThriftCLIService: Client protocol
> version: HIVE_CLI_SERVICE_PROTOCOL_V5
> 15/12/26 23:36:20 INFO session.SessionState: Created local directory:
> /tmp/c670ff55-01bb-4f6f-a375-d22a13c44eaf_resources
> 15/12/26 23:36:20 INFO session.SessionState: Created HDFS directory:
> /tmp/hive/anonymous/c670ff55-01bb-4f6f-a375-d22a13c44eaf
> 15/12/26 23:36:20 INFO session.SessionState: Created local directory:
> /tmp/hduser/c670ff55-01bb-4f6f-a375-d22a13c44eaf
> 15/12/26 23:36:20 INFO session.SessionState: Created HDFS directory:
> /tmp/hive/anonymous/c670ff55-01bb-4f6f-a375-d22a13c44eaf/_tmp_space.db
> 15/12/26 23:36:20 INFO thriftserver.SparkExecuteStatementOperation:
> Running query 'use default' with d842cd88-2fda-42b2-b943-468017e95f37
> 15/12/26 23:36:20 INFO parse.ParseDriver: Parsing command: use default
> 15/12/26 23:36:20 INFO parse.ParseDriver: Parse Completed
> 15/12/26 23:36:20 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/12/26 23:36:20 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/12/26 23:36:20 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/12/26 23:36:20 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/12/26 23:36:20 INFO parse.ParseDriver: Parsing command: use default
> 15/12/26 23:36:20 INFO parse.ParseDriver: Parse Completed
> 15/12/26 23:36:20 INFO log.PerfLogger:  start=1451190980590 end=1451190980591 duration=1
> from=org.apache.hadoop.hive.ql.Driver>
> 15/12/26 23:36:20 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/12/26 23:36:20 INFO metastore.HiveMetaStore: 2: get_database:
> default
> 15/12/26 23:36:20 INFO HiveMetaStore.audit: ugi=hduser
> ip=unknown-ip-addr cmd=get_database: default
> 15/12/26 23:36:20 INFO metastore.HiveMetaStore: 2: Opening raw store
> with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
> 15/12/26 23:36:20 INFO metastore.ObjectStore: ObjectStore, initialize
> called
> 15/12/26 23:36:20 INFO DataNucleus.Query: Reading in results for query
> "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used
> is closing
> 15/12/26 23:36:20 INFO metastore.MetaStoreDirectSql: Using direct SQL,
> underlying DB is DERBY
> 15/12/26 23:36:20 INFO metastore.ObjectStore: Initialized ObjectStore
> 15/12/26 23:36:20 INFO ql.Driver: Semantic Analysis Completed
> 15/12/26 23:36:20 INFO log.PerfLogger:  method=semanticAnalyze start=1451190980592 end=1451190980620 duration=28
> from=org.apache.hadoop.hive.ql.Driver>
> 15/12/26 23:36:20 INFO ql.Driver: Returning Hive schema:
> Schema(fieldSchemas:null, properties:null)
> 15/12/26 23:36:20 INFO log.PerfLogger:  start=1451190980588 end=1451190980621 duration=33
> from=org.apache.hadoop.hive.ql.Driver>
> 15/12/26 23:36:20 INFO ql.Driver: Concurrency mode is disabled, not
> creating a lock manager
> 15/12/26 23:36:20 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/12/26 23:36:20 INFO ql.Driver: Starting
> command(queryId=hduser_20151226233620_6bc633ef-5c6f-49e4-9300-f79fdf0c357b):
> use default
> 15/12/26 23:36:20 INFO log.PerfLogger:  start=1451190980588 end=1451190980622 duration=34
> from=org.apache.hadoop.hive.ql.Driver>
> 15/12/26 23:36:20 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.Driver>
> 15/12/26 23:36:20 INFO log.PerfLogger:  method=task.DDL.Stage-0 

Re: error while defining custom schema in Spark 1.5.0

2015-12-25 Thread Ted Yu
The error was due to blank field being defined twice.

On Tue, Dec 22, 2015 at 12:03 AM, Divya Gehlot 
wrote:

> Hi,
> I am new bee to Apache Spark ,using  CDH 5.5 Quick start VM.having spark
> 1.5.0.
> I working on custom schema and getting error
>
> import org.apache.spark.sql.hive.HiveContext
>>>
>>> scala> import org.apache.spark.sql.hive.orc._
>>> import org.apache.spark.sql.hive.orc._
>>>
>>> scala> import org.apache.spark.sql.types.{StructType, StructField,
>>> StringType, IntegerType};
>>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>>> IntegerType}
>>>
>>> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> 15/12/21 23:41:53 INFO hive.HiveContext: Initializing execution hive,
>>> version 1.1.0
>>> 15/12/21 23:41:53 INFO client.ClientWrapper: Inspected Hadoop version:
>>> 2.6.0-cdh5.5.0
>>> 15/12/21 23:41:53 INFO client.ClientWrapper: Loaded
>>> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.5.0
>>> hiveContext: org.apache.spark.sql.hive.HiveContext =
>>> org.apache.spark.sql.hive.HiveContext@214bd538
>>>
>>> scala> val customSchema = StructType(Seq(StructField("year",
>>> IntegerType, true),StructField("make", StringType,
>>> true),StructField("model", StringType, true),StructField("comment",
>>> StringType, true),StructField("blank", StringType, true)))
>>> customSchema: org.apache.spark.sql.types.StructType =
>>> StructType(StructField(year,IntegerType,true),
>>> StructField(make,StringType,true), StructField(model,StringType,true),
>>> StructField(comment,StringType,true), StructField(blank,StringType,true))
>>>
>>> scala> val customSchema = (new StructType).add("year", IntegerType,
>>> true).add("make", StringType, true).add("model", StringType,
>>> true).add("comment", StringType, true).add("blank", StringType, true)
>>> customSchema: org.apache.spark.sql.types.StructType =
>>> StructType(StructField(year,IntegerType,true),
>>> StructField(make,StringType,true), StructField(model,StringType,true),
>>> StructField(comment,StringType,true), StructField(blank,StringType,true))
>>>
>>> scala> val customSchema = StructType( StructField("year", IntegerType,
>>> true) :: StructField("make", StringType, true) :: StructField("model",
>>> StringType, true) :: StructField("comment", StringType, true) ::
>>> StructField("blank", StringType, true)::StructField("blank", StringType,
>>> true))
>>> :24: error: value :: is not a member of
>>> org.apache.spark.sql.types.StructField
>>>val customSchema = StructType( StructField("year", IntegerType,
>>> true) :: StructField("make", StringType, true) :: StructField("model",
>>> StringType, true) :: StructField("comment", StringType, true) ::
>>> StructField("blank", StringType, true)::StructField("blank", StringType,
>>> true))
>>>
>>
> Tried like like below also
>
> scala> val customSchema = StructType( StructField("year", IntegerType,
> true), StructField("make", StringType, true) ,StructField("model",
> StringType, true) , StructField("comment", StringType, true) ,
> StructField("blank", StringType, true),StructField("blank", StringType,
> true))
> :24: error: overloaded method value apply with alternatives:
>   (fields:
> Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
> 
>   (fields:
> java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
> 
>   (fields:
> Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
>  cannot be applied to (org.apache.spark.sql.types.StructField,
> org.apache.spark.sql.types.StructField,
> org.apache.spark.sql.types.StructField,
> org.apache.spark.sql.types.StructField,
> org.apache.spark.sql.types.StructField,
> org.apache.spark.sql.types.StructField)
>val customSchema = StructType( StructField("year", IntegerType,
> true), StructField("make", StringType, true) ,StructField("model",
> StringType, true) , StructField("comment", StringType, true) ,
> StructField("blank", StringType, true),StructField("blank", StringType,
> true))
>   ^
>Would really appreciate if somebody share the example which works with
> Spark 1.4 or Spark 1.5.0
>
> Thanks,
> Divya
>
> ^
>


Re: Stuck with DataFrame df.select("select * from table");

2015-12-25 Thread Ted Yu
DataFrame uses different syntax from SQL query.
I searched unit tests but didn't find any in the form of df.select("select
...")

Looks like you should use sqlContext as other people suggested.

On Fri, Dec 25, 2015 at 8:29 AM, Eugene Morozov 
wrote:

> Thanks for the comments, although the issue is not in limit() predicate.
> It's something with spark being unable to resolve the expression.
>
> I can do smth like this. It works as it suppose to:
>  df.select(df.col("*")).where(df.col("x1").equalTo(3.0)).show(5);
>
> But I think old fashioned sql style have to work also. I have
> df.registeredTempTable("tmptable") and then
>
> df.select("select * from tmptable where x1 = '3.0'").show();
>
> org.apache.spark.sql.AnalysisException: cannot resolve 'select * from tmp
> where x1 = '1.0'' given input columns x1, x4, x5, x3, x2;
>
> at
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:56)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.sca
>
>
> From the first statement I conclude that my custom datasource is perfectly
> fine.
> Just wonder how to fix / workaround that.
> --
> Be well!
> Jean Morozov
>
> On Fri, Dec 25, 2015 at 6:13 PM, Igor Berman 
> wrote:
>
>> sqlContext.sql("select * from table limit 5").show() (not sure if limit 5
>> supported)
>>
>> or use Dmitriy's solution. select() defines your projection when you've
>> specified entire query
>>
>> On 25 December 2015 at 15:42, Василец Дмитрий 
>> wrote:
>>
>>> hello
>>> you can try to use df.limit(5).show()
>>> just trick :)
>>>
>>> On Fri, Dec 25, 2015 at 2:34 PM, Eugene Morozov <
>>> evgeny.a.moro...@gmail.com> wrote:
>>>
 Hello, I'm basically stuck as I have no idea where to look;

 Following simple code, given that my Datasource is working gives me an
 exception.

 DataFrame df = sqlc.load(filename, 
 "com.epam.parso.spark.ds.DefaultSource");
 df.cache();
 df.printSchema();   <-- prints the schema perfectly fine!

 df.show();  <-- Works perfectly fine (shows table with 
 20 lines)!
 df.registerTempTable("table");
 df.select("select * from table limit 5").show(); <-- gives weird exception

 Exception is:

 AnalysisException: cannot resolve 'select * from table limit 5' given 
 input columns VER, CREATED, SOC, SOCC, HLTC, HLGTC, STATUS

 I can do a collect on a dataframe, but cannot select any specific
 columns either "select * from table" or "select VER, CREATED from table".

 I use spark 1.5.2.
 The same code perfectly works through Zeppelin 0.5.5.

 Thanks.
 --
 Be well!
 Jean Morozov

>>>
>>>
>>
>


Re: error in spark cassandra connector

2015-12-24 Thread Ted Yu
Mind providing a bit more detail ?

Release of Spark
version of Cassandra connector
How job was submitted
complete stack trace

Thanks

On Thu, Dec 24, 2015 at 2:06 AM, Vijay Kandiboyina  wrote:

> java.lang.NoClassDefFoundError:
> com/datastax/spark/connector/rdd/CassandraTableScanRDD
>
>


Re: How to contribute by picking up starter bugs

2015-12-24 Thread Ted Yu
You can send out pull request for the JIRA you're interested in.

Start the title of pull request with:
[SPARK-XYZ] ...

where XYZ is the JIRA number.

The pull request would be posted on the JIRA.
After pull request is reviewed, tested by QA and merged, the committer
would assign your name to the JIRA.

Cheers

On Thu, Dec 24, 2015 at 2:44 AM, lokeshkumar  wrote:

> Hi
>
> From the how to contribute page of spark jira project I came to know that I
> can start by picking up the starter label bugs.
> But who will assign me these bugs? Or should I just fix them and create a
> pull request.
>
> Will be glad to help the project.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-contribute-by-picking-up-starter-bugs-tp25795.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: rdd split into new rdd

2015-12-23 Thread Ted Yu
bq. {a=1, b=1, c=2, d=2}

Can you elaborate your criteria a bit more ? The above seems to be a Set,
not a Map.

Cheers

On Wed, Dec 23, 2015 at 7:11 AM, Yasemin Kaya  wrote:

> Hi,
>
> I have data
> *JavaPairRDD> *format. In example:
>
> *(1610, {a=1, b=1, c=2, d=2}) *
>
> I want to get
> *JavaPairRDD* In example:
>
>
> *(1610, {a, b})*
> *(1610, {c, d})*
>
> Is there a way to solve this problem?
>
> Best,
> yasemin
> --
> hiç ender hiç
>


Re: error creating custom schema

2015-12-23 Thread Ted Yu
Looks like a comma was missing after "C1"

Cheers

> On Dec 23, 2015, at 1:47 AM, Divya Gehlot  wrote:
> 
> Hi,
> I am trying to create custom schema but its throwing below error 
> 
> 
>> scala> import org.apache.spark.sql.hive.HiveContext
>> import org.apache.spark.sql.hive.HiveContext
>> 
>> scala> import org.apache.spark.sql.hive.orc._
>> import org.apache.spark.sql.hive.orc._
>> 
>> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> 15/12/23 04:42:09 WARN SparkConf: The configuration key 
>> 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark 1.3 
>> and and may be removed in the future. Please use the new key 
>> 'spark.yarn.am.waitTime' instead.
>> 15/12/23 04:42:09 INFO HiveContext: Initializing execution hive, version 
>> 0.13.1
>> hiveContext: org.apache.spark.sql.hive.HiveContext = 
>> org.apache.spark.sql.hive.HiveContext@3ca50ddf
>> 
>> scala> import org.apache.spark.sql.types.{StructType, StructField, 
>> StringType, IntegerType,FloatType ,LongType ,TimestampType };
>> import org.apache.spark.sql.types.{StructType, StructField, StringType, 
>> IntegerType, FloatType, LongType, TimestampType}
>> 
>> scala> val loandepoSchema = StructType(Seq(
>>  | StructField("C1" StringType, true),
>>  | StructField("COLUMN2", StringType , true),
>>  | StructField("COLUMN3", StringType, true),
>>  | StructField("COLUMN4", StringType, true),
>>  | StructField("COLUMN5", StringType , true),
>>  | StructField("COLUMN6", StringType, true),
>>  | StructField("COLUMN7", StringType, true),
>>  | StructField("COLUMN8", StringType, true),
>>  | StructField("COLUMN9", StringType, true),
>>  | StructField("COLUMN10", StringType, true),
>>  | StructField("COLUMN11", StringType, true),
>>  | StructField("COLUMN12", StringType, true),
>>  | StructField("COLUMN13", StringType, true),
>>  | StructField("COLUMN14", StringType, true),
>>  | StructField("COLUMN15", StringType, true),
>>  | StructField("COLUMN16", StringType, true),
>>  | StructField("COLUMN17", StringType, true)
>>  | StructField("COLUMN18", StringType, true),
>>  | StructField("COLUMN19", StringType, true),
>>  | StructField("COLUMN20", StringType, true),
>>  | StructField("COLUMN21", StringType, true),
>>  | StructField("COLUMN22", StringType, true)))
>> :25: error: value StringType is not a member of String
>>StructField("C1" StringType, true),
>> ^
> 
> Would really appreciate the guidance/pointers.
> 
> Thanks,
> Divya 


Re: Classification model method not found

2015-12-22 Thread Ted Yu
Looks like you should define ctor for ExtendedLR which accepts String (the
uid).

Cheers

On Tue, Dec 22, 2015 at 1:04 PM, njoshi  wrote:

> Hi,
>
> I have a custom extended LogisticRegression model which I want to test
> against a parameter grid search. I am running as follows:
>
> /
>val exLR = new ExtendedLR()
>   .setMaxIter(100)
>   .setFitIntercept(true)
>
> /*
>  * Cross Validator parameter grid
>  */
> val paramGrid = new ParamGridBuilder()
>   .addGrid(exLR.regParam, Array(1e-7, 1e-6, 1e-5, 1e-4, 1e-3, 2e-3,
> 1e-2, 1e-1, 0.001341682))
>   .addGrid(exLR.elasticNetParam, Array(0.95))
>   .build()
>
>
> /*
>  * Perform cross validation over the parameters
>  */
> val cv = new CrossValidator()
>   .setEstimator(exLR)
>   .setEvaluator(new BinaryClassificationEvaluator)
>   .setEstimatorParamMaps(paramGrid)
>   .setNumFolds(10)
>
> /*
>  * Run the grid search and pick up the best model
>  */
> val bestModel = cv.fit(trainingData)
>  .bestModel.asInstanceOf[ExtendedLRModel]
> /
>
> While run individually  (exLR.fit(trainingData) way)  it works fine, the
> crossValidation code produces the following error.
>
> /
> java.lang.NoSuchMethodException:
> org.apache.spark.ml.classification.ExtendedLR.(java.lang.String)
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getConstructor(Class.java:1825)
> at
> org.apache.spark.ml.param.Params$class.defaultCopy(params.scala:529)
> at org.apache.spark.ml.PipelineStage.defaultCopy(Pipeline.scala:37)
> at
>
> org.apache.spark.ml.classification.ExtendedLR.copy(FactorizationMachine.scala:434)
> at
>
> org.apache.spark.ml.classification.ExtendedLR.copy(FactorizationMachine.scala:156)
> at org.apache.spark.ml.Estimator.fit(Estimator.scala:59)
> at
> org.apache.spark.ml.Estimator$$anonfun$fit$1.apply(Estimator.scala:78)
> at
> org.apache.spark.ml.Estimator$$anonfun$fit$1.apply(Estimator.scala:78)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at org.apache.spark.ml.Estimator.fit(Estimator.scala:78)
> at
>
> org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1.apply(CrossValidator.scala:89)
> at
>
> org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1.apply(CrossValidator.scala:84)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at
> org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:84)
> at com.aol.advertising.ml.Driver$.main(Driver.scala:244)
> at com.aol.advertising.ml.Driver.main(Driver.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> /
>
> Is there anything, such as implicits, I need to add someplace?
> Note, *ExtendedLR* has exact same inheritance tree.
>
> Thanks in advance,
> Nikhil
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Classification-model-init-method-not-found-tp25770.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Can SqlContext be used inside mapPartitions

2015-12-22 Thread Ted Yu
bq. be able to lookup from inside MapPartitions based on a key

Please describe your use case in bit more detail.
One possibility is to use NoSQL database such as HBase. There're several
choices for Spark HBase connector.

Cheers

On Tue, Dec 22, 2015 at 4:51 PM, Zhan Zhang  wrote:

> SQLContext is in driver side, and I don’t think you can use it in
> executors.
> How to provide lookup functionality in executors really depends on how you
> would use them.
>
> Thanks.
>
> Zhan Zhang
>
> On Dec 22, 2015, at 4:44 PM, SRK  wrote:
>
> > Hi,
> >
> > Can SQL Context be used inside mapPartitions? My requirement is to
> register
> > a set of data from hdfs as a temp table and to be able to lookup from
> inside
> > MapPartitions based on a key. If it is not supported, is there a
> different
> > way of doing this?
> >
> > Thanks!
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Can-SqlContext-be-used-inside-mapPartitions-tp25771.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Which Hive version should be used with Spark 1.5.2?

2015-12-22 Thread Ted Yu
Please see SPARK-8064

On Tue, Dec 22, 2015 at 6:17 PM, Arthur Chan 
wrote:

> Hi,
>
> I plan to upgrade from 1.4.1 (+ Hive 1.1.0)  to 1.5.2, is there any
> upgrade document available about the upgrade especially which Hive version
> should be upgraded too?
>
> Regards
>
>


Re: Stand Alone Cluster - Strange issue

2015-12-22 Thread Ted Yu
This should be related:
https://issues.apache.org/jira/browse/SPARK-4170

On Tue, Dec 22, 2015 at 9:34 AM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> I have a standalone cluster. One Master + One Slave. I'm getting below
> "NULL POINTER" exception.
>
> Could you please help me on this issue.
>
>
> *Code Block :-*
>  val accum = sc.accumulator(0)
> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) *==> This line
> giving exception.*
>
> Exception :-
>
> 15/12/22 09:18:26 WARN scheduler.TaskSetManager: Lost task 1.0 in stage
> 0.0 (TID 1, 172.25.111.123): *java.lang.NullPointerException*
> at com.cc.ss.etl.Main$$anonfun$1.apply$mcVI$sp(Main.scala:25)
> at com.cc.ss.etl.Main$$anonfun$1.apply(Main.scala:25)
> at com.cc.ss.etl.Main$$anonfun$1.apply(Main.scala:25)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> 15/12/22 09:18:26 INFO scheduler.TaskSetManager: Lost task 0.0 in stage
> 0.0 (TID 0) on executor 172.25.111.123: java.lang.NullPointerException
> (null) [duplicate 1]
> 15/12/22 09:18:26 INFO scheduler.TaskSetManager: Starting task 0.1 in
> stage 0.0 (TID 2, 172.25.111.123, PROCESS_LOCAL, 2155 bytes)
> 15/12/22 09:18:26 INFO scheduler.TaskSetManager: Starting task 1.1 in
> stage 0.0 (TID 3, 172.25.111.123, PROCESS_LOCAL, 2155 bytes)
> 15/12/22 09:18:26 WARN scheduler.TaskSetManager: Lost task 1.1 in stage
> 0.0 (TID 3, 172.25.111.123):
>
> Regards,
> Rajesh
>


Re: Regarding spark in nemory

2015-12-22 Thread Ted Yu
If I understand your question correctly, the answer is yes. 
You can retrieve rows of the rdd which are distributed across the nodes. 

> On Dec 22, 2015, at 7:34 PM, Gaurav Agarwal  wrote:
> 
> If I have 3 more cluster and spark is running there .if I load the records 
> from phoenix to spark rdd and fetch the records from the spark through data 
> frame.
> 
> Now I want to know that spark is distributed?
> So I fetch the records from any of the node, records will be retrieved 
> present on any node present in spark rdd .


Re: driver OOM due to io.netty.buffer items not getting finalized

2015-12-22 Thread Ted Yu
This might be related but the jmap output there looks different:

http://stackoverflow.com/questions/32537965/huge-number-of-io-netty-buffer-poolthreadcachememoryregioncacheentry-instances

On Tue, Dec 22, 2015 at 2:59 AM, Antony Mayi 
wrote:

> I have streaming app (pyspark 1.5.2 on yarn) that's crashing due to driver
> (jvm part, not python) OOM (no matter how big heap is assigned, eventually
> runs out).
>
> When checking the heap it is all taken by "byte" items of
> io.netty.buffer.PoolThreadCache. The number of
> io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry is constant yet the
> number of [B "bytes" keeps growing as well as the number of Finalizer
> instances. When checking the Finalizer instances it is all of
> ZipFile$ZipFileInputStream and ZipFile$ZipFileInflaterInputStream
>
>  num #instances #bytes  class name
> --
>1:123556  278723776  [B
>2:258988   10359520  java.lang.ref.Finalizer
>3:1746209778720  java.util.zip.Deflater
>4: 666847468608  org.apache.spark.executor.TaskMetrics
>5: 800707160112  [C
>6:2826246782976
>  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>7:2063714952904  java.lang.Long
>
> the platform is using netty 3.6.6 and openjdk 1.8 (tried on 1.7 as well
> with same issue).
>
> would anyone have a clue how to troubleshoot further?
>
> thx.
>


Re: driver OOM due to io.netty.buffer items not getting finalized

2015-12-22 Thread Ted Yu
I searched code briefly.

The following uses ZipEntry, ZipOutputStream :

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala

FYI

On Tue, Dec 22, 2015 at 9:16 AM, Antony Mayi <antonym...@yahoo.com.invalid>
wrote:

> I narrowed it down to problem described for example here:
> https://bugs.openjdk.java.net/browse/JDK-6293787
>
> It is the mass finalization of zip Inflater/Deflater objects which can't
> keep up with the rate of these instances being garbage collected. as the
> jdk bug report (not being accepted as a bug) suggests this is an error of
> suboptimal destruction of the instances.
>
> Not sure where the zip comes from - for all the compressors used in spark
> I was using the default snappy codec.
>
> I am trying to disable all the spark.*.compress options and so far it
> seems this has dramatically improved, the finalization looks to be keeping
> up and the heap is stable.
>
> Any input is still welcome!
>
>
> On Tuesday, 22 December 2015, 12:17, Ted Yu <yuzhih...@gmail.com> wrote:
>
>
>
> This might be related but the jmap output there looks different:
>
>
> http://stackoverflow.com/questions/32537965/huge-number-of-io-netty-buffer-poolthreadcachememoryregioncacheentry-instances
>
> On Tue, Dec 22, 2015 at 2:59 AM, Antony Mayi <antonym...@yahoo.com.invalid
> > wrote:
>
> I have streaming app (pyspark 1.5.2 on yarn) that's crashing due to driver
> (jvm part, not python) OOM (no matter how big heap is assigned, eventually
> runs out).
>
> When checking the heap it is all taken by "byte" items of
> io.netty.buffer.PoolThreadCache. The number of
> io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry is constant yet the
> number of [B "bytes" keeps growing as well as the number of Finalizer
> instances. When checking the Finalizer instances it is all of
> ZipFile$ZipFileInputStream and ZipFile$ZipFileInflaterInputStream
>
>  num #instances #bytes  class name
> --
>1:123556  278723776  [B
>2:258988   10359520  java.lang.ref.Finalizer
>3:1746209778720  java.util.zip.Deflater
>4: 666847468608  org.apache.spark.executor.TaskMetrics
>5: 800707160112  [C
>6:2826246782976
>  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>7:2063714952904  java.lang.Long
>
> the platform is using netty 3.6.6 and openjdk 1.8 (tried on 1.7 as well
> with same issue).
>
> would anyone have a clue how to troubleshoot further?
>
> thx.
>
>
>
>
>


Re: Stand Alone Cluster - Strange issue

2015-12-22 Thread Ted Yu
Which Spark release are you using ?

Cheers

On Tue, Dec 22, 2015 at 9:34 AM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> I have a standalone cluster. One Master + One Slave. I'm getting below
> "NULL POINTER" exception.
>
> Could you please help me on this issue.
>
>
> *Code Block :-*
>  val accum = sc.accumulator(0)
> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) *==> This line
> giving exception.*
>
> Exception :-
>
> 15/12/22 09:18:26 WARN scheduler.TaskSetManager: Lost task 1.0 in stage
> 0.0 (TID 1, 172.25.111.123): *java.lang.NullPointerException*
> at com.cc.ss.etl.Main$$anonfun$1.apply$mcVI$sp(Main.scala:25)
> at com.cc.ss.etl.Main$$anonfun$1.apply(Main.scala:25)
> at com.cc.ss.etl.Main$$anonfun$1.apply(Main.scala:25)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> 15/12/22 09:18:26 INFO scheduler.TaskSetManager: Lost task 0.0 in stage
> 0.0 (TID 0) on executor 172.25.111.123: java.lang.NullPointerException
> (null) [duplicate 1]
> 15/12/22 09:18:26 INFO scheduler.TaskSetManager: Starting task 0.1 in
> stage 0.0 (TID 2, 172.25.111.123, PROCESS_LOCAL, 2155 bytes)
> 15/12/22 09:18:26 INFO scheduler.TaskSetManager: Starting task 1.1 in
> stage 0.0 (TID 3, 172.25.111.123, PROCESS_LOCAL, 2155 bytes)
> 15/12/22 09:18:26 WARN scheduler.TaskSetManager: Lost task 1.1 in stage
> 0.0 (TID 3, 172.25.111.123):
>
> Regards,
> Rajesh
>


Re: rdd only with one partition

2015-12-21 Thread Ted Yu
Have you tried the following method ?

   * Note: With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large.
Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord:
Ordering[T] = null)

Cheers

On Mon, Dec 21, 2015 at 2:47 AM, Zhiliang Zhu 
wrote:

> Dear All,
>
> For some rdd, while there is just one partition, then the operation &
> arithmetic would only be single, the rdd has lose all the parallelism
> benefit from spark  system ...
>
> Is it exactly like that?
>
> Thanks very much in advance!
> Zhiliang
>
>
>


Re: Writing output fails when spark.unsafe.offHeap is enabled

2015-12-21 Thread Ted Yu
w.r.t.

at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$RowComparator.compare(UnsafeExternalRowSorter.java:202)


I looked at UnsafeExternalRowSorter.java in 1.6.0 which only has 192 lines
of code.
Can you run with latest RC of 1.6.0 and paste the stack trace ?
Thanks

On Thu, Dec 17, 2015 at 5:04 PM, Mayuresh Kunjir 
wrote:

> I am testing a simple Sort program written using Dataframe APIs. When I
> enable spark.unsafe.offHeap, the output stage fails with a NPE. The
> exception when run on spark-1.5.1 is copied below.
>
> ​
> Job aborted due to stage failure: Task 23 in stage 3.0 failed 4 times,
> most recent failure: Lost task 23.3 in stage 3.0 (TID 667, xeno-40):
> java.lang.NullPointerException
>
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering.compare(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering.compare(GenerateOrdering.scala:28)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$RowComparator.compare(UnsafeExternalRowSorter.java:202)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:58)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:35)
>   at org.apache.spark.util.collection.TimSort.binarySort(TimSort.java:191)
>   at org.apache.spark.util.collection.TimSort.sort(TimSort.java:129)
>   at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:190)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:202)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:347)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:332)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:399)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:92)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174)
>   at 
> org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$executePartition$1(sort.scala:160)
>   at 
> org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$4.apply(sort.scala:169)
>   at 
> org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$4.apply(sort.scala:169)
>   at 
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:88)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
>
>
> ​My program looks as follows:
>
> case class Data(key: String, value: String)
>
> ​val lines = sc.textFile(args(0), 1)
> val data = lines.map(_.split(" ")).map(t=>Data(t(0), t(1))).toDF()
> data.registerTempTable("data")
> val sorted = data.sort("key")
> sorted.save(args(1))
>
> ​I am running the program on Yarn v2.6 and have tried spark-1.5.1 as well
> as current snapshot of spark-1.6.0.
>
> ​Thanks and regards,
> ~Mayuresh​
>
>
>


Re: rdd only with one partition

2015-12-21 Thread Ted Yu
I am not familiar with your use case, is it possible to perform the
randomized combination operation based on subset of the rows in rdd0 ?
That way you can increase the parallelism.

Cheers

On Mon, Dec 21, 2015 at 9:40 AM, Zhiliang Zhu <zchl.j...@yahoo.com> wrote:

> Hi Ted,
>
> Thanks a lot for your kind reply.
>
> I needs to convert this rdd0 into another rdd1, rows of  rdd1 are
> generated from rdd0's row randomly combination operation.
> From that perspective, rdd0 would be with one partition in order to
> randomly operate on its all rows, however, it would also lose spark
> parallelism benefit .
>
> Best Wishes!
> Zhiliang
>
>
>
>
> On Monday, December 21, 2015 11:17 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>
> Have you tried the following method ?
>
>* Note: With shuffle = true, you can actually coalesce to a larger
> number
>* of partitions. This is useful if you have a small number of
> partitions,
>* say 100, potentially with a few partitions being abnormally large.
> Calling
>* coalesce(1000, shuffle = true) will result in 1000 partitions with the
>* data distributed using a hash partitioner.
>*/
>   def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord:
> Ordering[T] = null)
>
> Cheers
>
> On Mon, Dec 21, 2015 at 2:47 AM, Zhiliang Zhu <zchl.j...@yahoo.com.invalid
> > wrote:
>
> Dear All,
>
> For some rdd, while there is just one partition, then the operation &
> arithmetic would only be single, the rdd has lose all the parallelism
> benefit from spark  system ...
>
> Is it exactly like that?
>
> Thanks very much in advance!
> Zhiliang
>
>
>
>
>
>


Re: How to convert and RDD to DF?

2015-12-20 Thread Ted Yu
See the comment for createDataFrame(rowRDD: RDD[Row], schema: StructType)
method:

   * Creates a [[DataFrame]] from an [[RDD]] containing [[Row]]s using the
given schema.
   * It is important to make sure that the structure of every [[Row]] of
the provided RDD matches
   * the provided schema. Otherwise, there will be runtime exception.
   * Example:
   * {{{
   *  import org.apache.spark.sql._
   *  import org.apache.spark.sql.types._
   *  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
   *
   *  val schema =
   *StructType(
   *  StructField("name", StringType, false) ::
   *  StructField("age", IntegerType, true) :: Nil)
   *
   *  val people =
   *sc.textFile("examples/src/main/resources/people.txt").map(
   *  _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
   *  val dataFrame = sqlContext.createDataFrame(people, schema)
   *  dataFrame.printSchema
   *  // root
   *  // |-- name: string (nullable = false)
   *  // |-- age: integer (nullable = true)

Cheers

On Sun, Dec 20, 2015 at 6:31 AM, Eran Witkon  wrote:

> Hi,
>
> I have an RDD
> jsonGzip
> res3: org.apache.spark.rdd.RDD[(String, String, String, String)] =
> MapPartitionsRDD[8] at map at :65
>
> which I want to convert to a DataFrame with schema
> so I created a schema:
>
> al schema =
>   StructType(
> StructField("cty", StringType, false) ::
>   StructField("hse", StringType, false) ::
> StructField("nm", StringType, false) ::
>   StructField("yrs", StringType, false) ::Nil)
>
> and called
>
> val unzipJSON = sqlContext.createDataFrame(jsonGzip,schema)
> :36: error: overloaded method value createDataFrame with 
> alternatives:
>   (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: 
> Class[_])org.apache.spark.sql.DataFrame 
>   (rdd: org.apache.spark.rdd.RDD[_],beanClass: 
> Class[_])org.apache.spark.sql.DataFrame 
>   (rowRDD: 
> org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: 
> org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame 
>   (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: 
> org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
>  cannot be applied to (org.apache.spark.rdd.RDD[(String, String, String, 
> String)], org.apache.spark.sql.types.StructType)
>val unzipJSON = sqlContext.createDataFrame(jsonGzip,schema)
>
>
> But as you see I don't have the right RDD type.
>
> So how cane I get the a dataframe with the right column names?
>
>
>


Re: Getting an error in insertion to mysql through sparkcontext in java..

2015-12-20 Thread Ted Yu
Was there stack trace following the error ?

Which Spark release are you using ?

Cheers

> On Dec 19, 2015, at 10:43 PM, Sree Eedupuganti  wrote:
> 
> i had 9 rows in my Mysql table
> 
> 
> options.put("dbtable", "(select * from employee");
>options.put("lowerBound", "1");
>options.put("upperBound", "8");
>options.put("numPartitions", "2");
> Error : Parameter index out of range (1 > number of parameters, which is 0)
> 
> -- 
> Best Regards,
> Sreeharsha Eedupuganti
> Data Engineer
> innData Analytics Private Limited


Re: spark 1.5.2 memory leak? reading JSON

2015-12-19 Thread Ted Yu
The 'Failed to parse a value' was the cause for execution failure.

Can you disclose the structure of your json file ?

Maybe try latest 1.6.0 RC to see if the problem goes away.

Thanks

On Sat, Dec 19, 2015 at 1:55 PM, Eran Witkon  wrote:

> Hi,
> I tried the following code in spark-shell on spark1.5.2:
>
> *val df =
> sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json")*
> *df.count()*
>
> 15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size =
> 67108864 bytes, TID = 3
> 15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID
> 3)
> java.lang.RuntimeException: Failed to parse a value for data type
> StructType() (current token: VALUE_STRING).
> at scala.sys.package$.error(package.scala:27)
> at
> org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172)
> at
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251)
> at
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
> 1.org
> $apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>
> Am I am doing something wrong?
> Eran
>


Re: how to fetch all of data from hbase table in spark java

2015-12-19 Thread Ted Yu
Please take a look at:
examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala

There're various hbase connectors (search for 'apache spark hbase
connector')

In hbase 2.0, there would be hbase-spark module which provides hbase
connector.

FYI

On Fri, Dec 18, 2015 at 11:56 PM, Sateesh Karuturi <
sateesh.karutu...@gmail.com> wrote:

> Hello experts... i am new to spark, anyone please explain me how to fetch
> data from hbase table in spark java
> Thanks in Advance...
>


Re: Does calling sqlContext.cacheTable("oldTableName") remove the cached contents of the oldTable

2015-12-18 Thread Ted Yu
CacheManager#cacheQuery() is called where:
  * Caches the data produced by the logical representation of the given
[[Queryable]].
...
val planToCache = query.queryExecution.analyzed
if (lookupCachedData(planToCache).nonEmpty) {

Is the schema for dfNew different from that of dfOld ?

Cheers

On Fri, Dec 18, 2015 at 3:33 AM, Sahil Sareen  wrote:

> Spark 1.5.2
>
> dfOld.registerTempTable("oldTableName")
> sqlContext.cacheTable("oldTableName")
> // 
> // do something
> // 
> dfNew.registerTempTable("oldTableName")
> sqlContext.cacheTable("oldTableName")
>
>
> Now when I use the "oldTableName" table I do get the latest contents
> from dfNew but do the contents of dfOld get removed from the memory?
>
> Or is the right usage to do this:
> dfOld.registerTempTable("oldTableName")
> sqlContext.cacheTable("oldTableName")
> // 
> // do something
> // 
> dfNew.registerTempTable("oldTableName")
> sqlContext.unCacheTable("oldTableName") <== unCache the old
> contents first
> sqlContext.cacheTable("oldTableName")
>
> -Sahil
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: HiveContext Self join not reading from cache

2015-12-18 Thread Ted Yu
;> Code Generation: true
>>
>>
>>
>> Regards,
>> Gourav
>>
>> On Fri, Dec 18, 2015 at 8:55 AM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> hi,
>>>
>>> I think that people have reported the same issue elsewhere, and this
>>> should be registered as a bug in SPARK
>>>
>>> https://forums.databricks.com/questions/2142/self-join-in-spark-sql.html
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Thu, Dec 17, 2015 at 10:52 AM, Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi Ted,
>>>>
>>>> The self join works fine on tbales where the hivecontext tables are
>>>> direct hive tables, therefore
>>>>
>>>> table1 = hiveContext.sql("select columnA, columnB from hivetable1")
>>>> table1.registerTempTable("table1")
>>>> table1.cache()
>>>> table1.count()
>>>>
>>>> and if I do a self join on table1 things are quite fine
>>>>
>>>> But in case we have something like this:
>>>> table1 = hiveContext.sql("select columnA, columnB from hivetable1")
>>>> table1.registerTempTable("table1")
>>>> table1.cache()
>>>> table1.count()
>>>>
>>>> table2 = hiveContext.sql("select columnA, columnB from hivetable2")
>>>> table2.registerTempTable("table2")
>>>> table2.cache()
>>>> table2.count()
>>>>
>>>> table3 = hiveContext.sql("select table1.* from table1 table2 where
>>>> table1.columnA = table2.columnA")
>>>> table3.registerTempTable("table3")
>>>> table3.cache()
>>>> table3.count()
>>>>
>>>>
>>>> then the self join on table3 does not take data from table3 cache,
>>>> neither from table1 or table2 cache but starts taking data directly from S3
>>>> - which as you would understand does not make any sense.
>>>>
>>>>
>>>> Regards,
>>>> Gourav
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Dec 16, 2015 at 7:16 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> I did the following exercise in spark-shell ("c" is cached table):
>>>>>
>>>>> scala> sqlContext.sql("select x.b from c x join c y on x.a =
>>>>> y.a").explain
>>>>> == Physical Plan ==
>>>>> Project [b#4]
>>>>> +- BroadcastHashJoin [a#3], [a#125], BuildRight
>>>>>:- InMemoryColumnarTableScan [b#4,a#3], InMemoryRelation
>>>>> [a#3,b#4,c#5], true, 1, StorageLevel(true, true, false, true, 1),
>>>>> ConvertToUnsafe, Some(c)
>>>>>+- InMemoryColumnarTableScan [a#125], InMemoryRelation
>>>>> [a#125,b#126,c#127], true, 1, StorageLevel(true, true, false, true, 
>>>>> 1),
>>>>> ConvertToUnsafe, Some(c)
>>>>>
>>>>> sqlContext.sql("select x.b, y.c from c x join c y on x.a =
>>>>> y.a").registerTempTable("d")
>>>>> scala> sqlContext.cacheTable("d")
>>>>>
>>>>> scala> sqlContext.sql("select x.b from d x join d y on x.c =
>>>>> y.c").explain
>>>>> == Physical Plan ==
>>>>> Project [b#4]
>>>>> +- SortMergeJoin [c#90], [c#253]
>>>>>:- Sort [c#90 ASC], false, 0
>>>>>:  +- TungstenExchange hashpartitioning(c#90,200), None
>>>>>: +- InMemoryColumnarTableScan [b#4,c#90], InMemoryRelation
>>>>> [b#4,c#90], true, 1, StorageLevel(true, true, false, true, 1), Project
>>>>> [b#4,c#90], Some(d)
>>>>>+- Sort [c#253 ASC], false, 0
>>>>>   +- TungstenExchange hashpartitioning(c#253,200), None
>>>>>  +- InMemoryColumnarTableScan [c#253], InMemoryRelation
>>>>> [b#246,c#253], true, 1, StorageLevel(true, true, false, true, 1),
>>>>> Project [b#4,c#90], Some(d)
>>>>>
>>>>> Is the above what you observed ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Wed, Dec 16, 2015 at 9:34 AM, Gourav Sengupta <
>>>>> gourav.sengu...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> This is how the data  can be created:
>>>>>>
>>>>>> 1. TableA : cached()
>>>>>> 2. TableB : cached()
>>>>>> 3. TableC: TableA inner join TableB cached()
>>>>>> 4. TableC join TableC does not take the data from cache but starts
>>>>>> reading the data for TableA and TableB from disk.
>>>>>>
>>>>>> Does this sound like a bug? The self join between TableA and TableB
>>>>>> are working fine and taking data from cache.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Gourav
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark with log4j

2015-12-18 Thread Ted Yu
See this thread:
http://search-hadoop.com/m/q3RTtEor1vYWbsW

which mentioned:
SPARK-11105 Disitribute the log4j.properties files from the client to the
executors

FYI

On Fri, Dec 18, 2015 at 7:23 AM, Kalpesh Jadhav <
kalpesh.jad...@citiustech.com> wrote:

> Hi all,
>
>
>
> I am new to spark, I am trying to use log4j for logging my application.
>
> But any how the logs are not getting written at specified file.
>
>
>
> I have created application using maven, and kept log.properties file at
> resources folder.
>
> Application written in scala .
>
>
>
> If there is any alternative instead of log4j then also it will work, but I
> wanted to see logs in file.
>
>
>
> If any changes need to be done in hortonworks
> 
> for spark configuration, please mentioned that as well.
>
>
>
> If anyone has done before or on github any source available please respond.
>
>
>
>
>
> Thanks,
>
> Kalpesh Jadhav
> ===
> DISCLAIMER: The information contained in this message (including any
> attachments) is confidential and may be privileged. If you have received it
> by mistake please notify the sender by return e-mail and permanently delete
> this message and any attachments from your system. Any dissemination, use,
> review, distribution, printing or copying of this message in whole or in
> part is strictly prohibited. Please note that e-mails are susceptible to
> change. CitiusTech shall not be liable for the improper or incomplete
> transmission of the information contained in this communication nor for any
> delay in its receipt or damage to your system. CitiusTech does not
> guarantee that the integrity of this communication has been maintained or
> that this communication is free of viruses, interceptions or interferences.
> 
>
>


<    2   3   4   5   6   7   8   9   10   11   >