Re: Exception when reading multiline JSON file

2019-09-12 Thread Kevin Mellott
Hi Kumaresh,

This is most likely an issue with the size of your Spark cluster not being
large enough to accomplish the desired task. Hints for this type of
situation are when the stack trace mentions things like a size limitation
was exceeded and you lost a node.

However, this is also a great opportunity for you to learn how to monitor a
Spark job for performance considerations. There is guidance on the Spark
website that will get you started, and you will want to dig into the Spark
cluster UI and driver log files to see information in action. Databricks
also makes situations like this easier to troubleshoot because you can
execute the Spark code one step at a time using their visual notebook
experience.

Hope that helps point you in the right direction.

https://spark.apache.org/docs/latest/monitoring.html
https://m.youtube.com/watch?v=KscZf1y97m8

Kevin

On Thu, Sep 12, 2019 at 12:04 PM Kumaresh AK 
wrote:

> Hello Spark Community!
> I am new to Spark. I tried to read a multiline json file (has around 2M
> records and gzip size is about 2GB) and encountered an exception. It works
> if I convert the same file into jsonl before reading it via spark.
> Unfortunately the file is private and I cannot share it. Is there any
> information that can help me narrow it down?
> I tried writing to parquet and json. Both face the same exception. This is
> the short form of the code:
>
> df = spark.read.option('multiline', 'true').json("file.json.gz") \
> .select(explode("objects")).select("col.*")
>
> df.write.mode('overwrite').json(p)
> # this crashes too: df.write.mode('overwrite').parquet(p)
>
>
> Json file is of the form:
> {
>
> "version":"xyz",
>
> "objects":[
>
> {
>
> "id":"abc",
>
> 
>
> },
>
> {
>
> "id": "def",
>
> 
>
> },
>
> 
>
> ]
>
> }
>
>
> This is the exception:
>
>> 2019-09-12 14:02:01,515 WARN scheduler.TaskSetManager: Lost task 0.0 in
>> stage 1.0 (TID 1, 172.21.0.3, executor 0): org.apache.spark.SparkException:
>> Task failed while writing rows.
>> at
>> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>> at
>> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>> at
>> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>> at org.apache.spark.scheduler.Task.run(Task.scala:121)
>> at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
>> at
>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalArgumentException: Cannot grow BufferHolder
>> by size 16 because the size after growing exceeds size limitation 2147483632
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:62)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.writeUnalignedBytes(UnsafeWriter.java:126)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:109)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown
>> Source)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_1_6$(Unknown
>> Source)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>> Source)
>> at
>> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1$$anonfun$apply$1.apply(FileFormat.scala:149)
>> at
>> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1$$anonfun$apply$1.apply(FileFormat.scala:148)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
>> at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:104)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>> Source)
>> at
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>> at
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>> at 

Re: How to sleep Spark job

2019-01-22 Thread Kevin Mellott
I’d recommend using a scheduler of some kind to trigger your job each hour,
and have the Spark job exit when it completes. Spark is not meant to run in
any type of “sleep mode”, unless you want to run a structured streaming job
and create a separate process to pull data from Casandra and publish it to
your streaming endpoint. That decision really depends more on your use case.

On Tue, Jan 22, 2019 at 11:56 PM Soheil Pourbafrani 
wrote:

> Hi,
>
> I want to submit a job in YARN cluster to read data from Cassandra and
> write them in HDFS, every hour, for example.
>
> Is it possible to make Spark Application sleep in a while true loop and
> awake every hour to process data?
>


Re: Aggregated column name

2017-03-23 Thread Kevin Mellott
I'm not sure of the answer to your question; however, when performing
aggregates I find it useful to specify an *alias* for each column. That
will give you explicit control over the name of the resulting column.

In your example, that would look something like:

df.groupby(col("...")).agg(count("number"))*.alias("ColumnNameCount")*

Hope that helps!
Kevin

On Thu, Mar 23, 2017 at 2:41 AM, Wen Pei Yu  wrote:

> Hi All
>
> I found some spark version(spark 1.4) return upper case aggregated
> column,  and some return low case.
> As below code,
> df.groupby(col("...")).agg(count("number"))
> may return
>
> COUNT(number)  -- spark 1,4
> count(number) - spark 1.6
>
> Anyone know if there is configure parameter for this, or which PR change
> this?
>
> Thank you very much.
> Yu Wenpei.
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: pivot over non numerical data

2017-02-01 Thread Kevin Mellott
This should work for non-numerical data as well - can you please elaborate
on the error you are getting and provide a code sample? As a preliminary
hint, you can "aggregate" text values using *max*.

df.groupBy("someCol")
  .pivot("anotherCol")
  .agg(max($"textCol"))

Thanks,
Kevin

On Wed, Feb 1, 2017 at 2:02 PM, Darshan Pandya 
wrote:

> Hello,
>
> I am trying to transpose some data using groupBy pivot aggr as mentioned
> in this blog
> https://databricks.com/blog/2016/02/09/reshaping-data-
> with-pivot-in-apache-spark.html
>
> But this works only for numerical data.
> Any hints for doing the same thing for non numerical data ?
>
>
> --
> Sincerely,
> Darshan
>
>


Re: Nearest neighbour search

2016-11-14 Thread Kevin Mellott
You may be able to benefit from Soundcloud's open source implementation,
either as a solution or as a reference implementation.

https://github.com/soundcloud/cosine-lsh-join-spark

Thanks,
Kevin

On Sun, Nov 13, 2016 at 2:07 PM, Meeraj Kunnumpurath <
mee...@servicesymphony.com> wrote:

> That was a bit of a brute force search, so I changed the code to use a UDF
> to create the dot product between the two IDF vectors, and do a sort on the
> new column.
>
> package com.ss.ml.clustering
>
> import org.apache.spark.sql.{DataFrame, SparkSession}
> import org.apache.spark.sql.functions._
> import org.apache.spark.ml.feature.{IDF, Tokenizer, HashingTF}
> import org.apache.spark.ml.linalg.Vector
>
> object ClusteringBasics extends App {
>
>   val spark = SparkSession.builder().appName("Clustering 
> Basics").master("local").getOrCreate()
>   import spark.implicits._
>
>   val df = spark.read.option("header", "false").csv("data")
>
>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>   val df1 = tf.transform(tk.transform(df))
>   val idfs = idf.fit(df1).transform(df1)
>
>   val nn = nearestNeighbour("", 
> idfs)
>   println(nn)
>
>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
> val tfIdfSrc = ds.filter(s"_c0 == 
> '$uri'").take(1)(0).getAs[Vector]("tf-idf")
> def dorProduct(vectorA: Vector) = {
>   var dp = 0.0
>   var index = vectorA.size - 1
>   for (i <- 0 to index) {
> dp += vectorA(i) * tfIdfSrc(i)
>   }
>   dp
> }
> val dpUdf = udf((v1: Vector, v2: Vector) => dorProduct(v1))
> ds.filter(s"_c0 != '$uri'").withColumn("dp", 
> dpUdf('tf-idf)).sort("dp").take(1)(0).getString(1)
>   }
>
> }
>
>
> However, that is generating the exception below,
>
> Exception in thread "main" java.lang.RuntimeException: Unsupported literal
> type class org.apache.spark.ml.feature.IDF idf_e49381a285dd
> at org.apache.spark.sql.catalyst.expressions.Literal$.apply(
> literals.scala:57)
> at org.apache.spark.sql.functions$.lit(functions.scala:101)
> at org.apache.spark.sql.Column.$minus(Column.scala:672)
> at com.ss.ml.clustering.ClusteringBasics$.nearestNeighbour(
> ClusteringBasics.scala:36)
> at com.ss.ml.clustering.ClusteringBasics$.delayedEndpoint$com$ss$ml$
> clustering$ClusteringBasics$1(ClusteringBasics.scala:22)
> at com.ss.ml.clustering.ClusteringBasics$delayedInit$
> body.apply(ClusteringBasics.scala:8)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> at scala.runtime.AbstractFunction0.apply$mcV$
> sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.generic.TraversableForwarder$class.
> foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at com.ss.ml.clustering.ClusteringBasics$.main(ClusteringBasics.scala:8)
> at com.ss.ml.clustering.ClusteringBasics.main(ClusteringBasics.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> On Sun, Nov 13, 2016 at 10:56 PM, Meeraj Kunnumpurath <
> mee...@servicesymphony.com> wrote:
>
>> This is what I have done, is there a better way of doing this?
>>
>>   val df = spark.read.option("header", "false").csv("data")
>>
>>
>>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>>
>>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>>
>>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>>
>>
>>   val df1 = tf.transform(tk.transform(df))
>>
>>   val idfs = idf.fit(df1).transform(df1)
>>
>>
>>   println(nearestNeighbour("http://dbpedia.org/resource/Barack_Obama;,
>> idfs))
>>
>>
>>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
>>
>> var res : Row = null
>>
>> var metric : Double = 0
>>
>> val tfIdfSrc = ds.filter(s"_c0 == '$uri'").take(1)(0).getAs[Vect
>> or]("tf-idf")
>>
>> ds.filter("_c0 != '" + uri + "'").foreach { r =>
>>
>>   val tfIdfDst = r.getAs[Vector]("tf-idf")
>>
>>   val dp = dorProduct(tfIdfSrc, tfIdfDst)
>>
>>   if (dp > metric) {
>>
>> res = r
>>
>> metric = dp
>>
>>   }
>>
>> }
>>
>> return res.getAs[String]("_c1")
>>
>>   }
>>
>>
>>   def cosineSimilarity(vectorA: Vector, vectorB: Vector) = {
>>
>> var dotProduct = 0.0
>>
>> var normA = 0.0
>>
>> var normB = 0.0
>>
>> var index = vectorA.size - 1
>>
>> for (i <- 0 to index) {
>>
>>   dotProduct += 

Re: Spark Streaming Advice

2016-10-10 Thread Kevin Mellott
The batch interval was set to 30 seconds; however, after getting the
parquet files to save faster I lowered the interval to 10 seconds. The
number of log messages contained in each batch varied from just a few up to
around 3500, with the number of partitions ranging from 1 to around 15.

I will have to check out HBase as well; I've heard good things!

Thanks,
Kevin

On Mon, Oct 10, 2016 at 11:38 AM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> Hi Kevin,
>
> What is the streaming interval (batch interval) above?
>
> I do analytics on streaming trade data but after manipulation of
> individual messages I store the selected on in Hbase. Very fast.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 October 2016 at 15:25, Kevin Mellott <kevin.r.mell...@gmail.com>
> wrote:
>
>> Whilst working on this application, I found a setting that drastically
>> improved the performance of my particular Spark Streaming application. I'm
>> sharing the details in hopes that it may help somebody in a similar
>> situation.
>>
>> As my program ingested information into HDFS (as parquet files), I
>> noticed that the time to process each batch was significantly greater than
>> I anticipated. Whether I was writing a single parquet file (around 8KB) or
>> around 10-15 files (8KB each), that step of the processing was taking
>> around 30 seconds. Once I set the configuration below, this operation
>> reduced from 30 seconds to around 1 second.
>>
>> // ssc = instance of SparkStreamingContext
>> ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
>> "false")
>>
>> I've also verified that the parquet files being generated are usable by
>> both Hive and Impala.
>>
>> Hope that helps!
>> Kevin
>>
>> On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott <kevin.r.mell...@gmail.com>
>> wrote:
>>
>>> I'm attempting to implement a Spark Streaming application that will
>>> consume application log messages from a message broker and store the
>>> information in HDFS. During the data ingestion, we apply a custom schema to
>>> the logs, partition by application name and log date, and then save the
>>> information as parquet files.
>>>
>>> All of this works great, except we end up having a large number of
>>> parquet files created. It's my understanding that Spark Streaming is unable
>>> to control the number of files that get generated in each partition; can
>>> anybody confirm that is true?
>>>
>>> Also, has anybody else run into a similar situation regarding data
>>> ingestion with Spark Streaming and do you have any tips to share? Our end
>>> goal is to store the information in a way that makes it efficient to query,
>>> using a tool like Hive or Impala.
>>>
>>> Thanks,
>>> Kevin
>>>
>>
>>
>


Re: Spark Streaming Advice

2016-10-10 Thread Kevin Mellott
Whilst working on this application, I found a setting that drastically
improved the performance of my particular Spark Streaming application. I'm
sharing the details in hopes that it may help somebody in a similar
situation.

As my program ingested information into HDFS (as parquet files), I noticed
that the time to process each batch was significantly greater than I
anticipated. Whether I was writing a single parquet file (around 8KB) or
around 10-15 files (8KB each), that step of the processing was taking
around 30 seconds. Once I set the configuration below, this operation
reduced from 30 seconds to around 1 second.

// ssc = instance of SparkStreamingContext
ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
"false")

I've also verified that the parquet files being generated are usable by
both Hive and Impala.

Hope that helps!
Kevin

On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott <kevin.r.mell...@gmail.com>
wrote:

> I'm attempting to implement a Spark Streaming application that will
> consume application log messages from a message broker and store the
> information in HDFS. During the data ingestion, we apply a custom schema to
> the logs, partition by application name and log date, and then save the
> information as parquet files.
>
> All of this works great, except we end up having a large number of parquet
> files created. It's my understanding that Spark Streaming is unable to
> control the number of files that get generated in each partition; can
> anybody confirm that is true?
>
> Also, has anybody else run into a similar situation regarding data
> ingestion with Spark Streaming and do you have any tips to share? Our end
> goal is to store the information in a way that makes it efficient to query,
> using a tool like Hive or Impala.
>
> Thanks,
> Kevin
>


Re: Spark ML Decision Trees Algorithm

2016-09-30 Thread Kevin Mellott
The documentation details the algorithm being used at
http://spark.apache.org/docs/latest/mllib-decision-tree.html

Thanks,
Kevin

On Fri, Sep 30, 2016 at 1:14 AM, janardhan shetty 
wrote:

> Hi,
>
> Any help here is appreciated ..
>
> On Wed, Sep 28, 2016 at 11:34 AM, janardhan shetty  > wrote:
>
>> Is there a reference to the research paper which is implemented in spark
>> 2.0 ?
>>
>> On Wed, Sep 28, 2016 at 9:52 AM, janardhan shetty > > wrote:
>>
>>> Which algorithm is used under the covers while doing decision trees FOR
>>> SPARK ?
>>> for example: scikit-learn (python) uses an optimised version of the
>>> CART algorithm.
>>>
>>
>>
>


Re: Dataframe Grouping - Sorting - Mapping

2016-09-30 Thread Kevin Mellott
When you perform a .groupBy, you need to perform an aggregate immediately
afterwards.

For example:

val df1 = df.groupBy("colA").agg(sum(df1("colB")))
df1.show()

More information and examples can be found in the documentation below.

http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.DataFrame

Thanks,
Kevin

On Fri, Sep 30, 2016 at 5:46 AM, AJT  wrote:

> I'm looking to do the following with my Spark dataframe
> (1) val df1 = df.groupBy()
> (2) val df2 = df1.sort()
> (3) val df3 = df2.mapPartitions()
>
> I can already groupBy the column (in this case a long timestamp) - but have
> no idea how then to ensure the returned GroupedData is then sorted by the
> same timeStamp and the mapped to my set of functions
>
> Appreciate any help
> Thanks
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Dataframe-Grouping-Sorting-Mapping-tp27821.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Optimal/Expected way to run demo spark-scala scripts?

2016-09-23 Thread Kevin Mellott
You can run Spark code using the command line or by creating a JAR file
(via IntelliJ or other IDE); however, you may wish to try a Databricks
Community Edition account instead. They offer Spark as a managed service,
and you can run Spark commands one at a time via interactive notebooks.
There are built-in visualization tools, with the ability to integrate to
3rd party ones if you wish.

This type of development is very similar to iPython, but they provide a 6GB
cluster with their free accounts. They also provide many example notebooks
to help you learn various aspects of Spark.

https://databricks.com/try-databricks

Thanks,
Kevin

On Fri, Sep 23, 2016 at 2:37 PM, Dan Bikle  wrote:

> hello spark-world,
>
> I am new to spark and want to learn how to use it.
>
> I come from the Python world.
>
> I see an example at the url below:
>
> http://spark.apache.org/docs/latest/ml-pipeline.html#
> example-estimator-transformer-and-param
>
> What would be an optimal way to run the above example?
>
> In the Python world I would just feed the name of the script to Python on
> the command line.
>
> In the spark-world would people just start spark-shell and use a mouse to
> feed in the syntax?
>
> Perhaps people would follow the example here which uses a combo of sbt and
> spark-submit:
>
> http://spark.apache.org/docs/latest/quick-start.html#self-
> contained-applications
>
> ??
>
> Perhaps people usually have a Java-mindset and use an IDE built for
> spark-development?
> If so, which would be considered the best IDE for Spark? IntelliJ?
>
>


Re: In Spark-scala, how to fill Vectors.dense in DataFrame from CSV?

2016-09-22 Thread Kevin Mellott
You'll want to use the spark-csv package, which is included in Spark 2.0.
The repository documentation has some great usage examples.

https://github.com/databricks/spark-csv

Thanks,
Kevin

On Thu, Sep 22, 2016 at 8:40 PM, Dan Bikle  wrote:

> hello spark-world,
>
> I am new to spark.
>
> I noticed this online example:
>
> http://spark.apache.org/docs/latest/ml-pipeline.html
>
> I am curious about this syntax:
>
> // Prepare training data from a list of (label, features) tuples.
> val training = spark.createDataFrame(Seq(
>   (1.0, Vectors.dense(0.0, 1.1, 0.1)),
>   (0.0, Vectors.dense(2.0, 1.0, -1.0)),
>   (0.0, Vectors.dense(2.0, 1.3, 1.0)),
>   (1.0, Vectors.dense(0.0, 1.2, -0.5))
> )).toDF("label", "features")
>
> Is it possible to replace the above call to some syntax which reads values
> from CSV?
>
> I want something comparable to Python-Pandas read_csv() method.
>
>


Re: unresolved dependency: datastax#spark-cassandra-connector;2.0.0-s_2.11-M3-20-g75719df: not found

2016-09-21 Thread Kevin Mellott
The "unresolved dependency" error is stating that the datastax dependency
could not be located in the Maven repository. I believe that this should
work if you change that portion of your command to the following.

--packages com.datastax.spark:spark-cassandra-connector_2.10:2.0.0-M3

You can verify the available versions by searching Maven at
http://search.maven.org.

Thanks,
Kevin

On Wed, Sep 21, 2016 at 3:38 AM, muhammet pakyürek 
wrote:

> while i run the spark-shell as below
>
> spark-shell --jars '/home/ktuser/spark-cassandra-
> connector/target/scala-2.11/root_2.11-2.0.0-M3-20-g75719df.jar'
> --packages datastax:spark-cassandra-connector:2.0.0-s_2.11-M3-20-g75719df
> --conf spark.cassandra.connection.host=localhost
>
> i get the error
> unresolved dependency: datastax#spark-cassandra-
> connector;2.0.0-s_2.11-M3-20-g75719df.
>
>
> the second question even if i added
>
> libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % 
> "2.0.0-M3"
>
> to spark-cassandra-connector/sbt/sbt file jar files are
> root_2.11-2.0.0-M3-20-g75719df
>
>
> teh third question after build of connectpr scala 2.11 how do i integrate
> it with pyspark?
>
>


Re: Similar Items

2016-09-20 Thread Kevin Mellott
Using the Soundcloud implementation of LSH, I was able to process a 22K
product dataset in a mere 65 seconds! Thanks so much for the help!

On Tue, Sep 20, 2016 at 1:15 PM, Kevin Mellott <kevin.r.mell...@gmail.com>
wrote:

> Thanks Nick - those examples will help a ton!!
>
> On Tue, Sep 20, 2016 at 12:20 PM, Nick Pentreath <nick.pentre...@gmail.com
> > wrote:
>
>> A few options include:
>>
>> https://github.com/marufaytekin/lsh-spark - I've used this a bit and it
>> seems quite scalable too from what I've looked at.
>> https://github.com/soundcloud/cosine-lsh-join-spark - not used this but
>> looks like it should do exactly what you need.
>> https://github.com/mrsqueeze/*spark*-hash
>> <https://github.com/mrsqueeze/spark-hash>
>>
>>
>> On Tue, 20 Sep 2016 at 18:06 Kevin Mellott <kevin.r.mell...@gmail.com>
>> wrote:
>>
>>> Thanks for the reply, Nick! I'm typically analyzing around 30-50K
>>> products at a time (as an isolated set of products). Within this set of
>>> products (which represents all products for a particular supplier), I am
>>> also analyzing each category separately. The largest categories typically
>>> have around 10K products.
>>>
>>> That being said, when calculating IDFs for the 10K product set we come
>>> out with roughly 12K unique tokens. In other words, our vectors are 12K
>>> columns wide (although they are being represented using SparseVectors). We
>>> have a step that is attempting to locate all documents that share the same
>>> tokens, and for those items we will calculate the cosine similarity.
>>> However, the part that attempts to identify documents with shared tokens is
>>> the bottleneck.
>>>
>>> For this portion, we map our data down to the individual tokens
>>> contained by each document. For example:
>>>
>>> DocumentId   |   Description
>>> 
>>> 
>>> 1   Easton Hockey Stick
>>> 2   Bauer Hockey Gloves
>>>
>>> In this case, we'd map to the following:
>>>
>>> (1, 'Easton')
>>> (1, 'Hockey')
>>> (1, 'Stick')
>>> (2, 'Bauer')
>>> (2, 'Hockey')
>>> (2, 'Gloves')
>>>
>>> Our goal is to aggregate this data as follows; however, our code that
>>> currently does this is does not perform well. In the realistic 12K product
>>> scenario, this resulted in 430K document/token tuples.
>>>
>>> ((1, 2), ['Hockey'])
>>>
>>> This then tells us that documents 1 and 2 need to be compared to one
>>> another (via cosine similarity) because they both contain the token
>>> 'hockey'. I will investigate the methods that you recommended to see if
>>> they may resolve our problem.
>>>
>>> Thanks,
>>> Kevin
>>>
>>> On Tue, Sep 20, 2016 at 1:45 AM, Nick Pentreath <
>>> nick.pentre...@gmail.com> wrote:
>>>
>>>> How many products do you have? How large are your vectors?
>>>>
>>>> It could be that SVD / LSA could be helpful. But if you have many
>>>> products then trying to compute all-pair similarity with brute force is not
>>>> going to be scalable. In this case you may want to investigate hashing
>>>> (LSH) techniques.
>>>>
>>>>
>>>> On Mon, 19 Sep 2016 at 22:49, Kevin Mellott <kevin.r.mell...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm trying to write a Spark application that will detect similar items
>>>>> (in this case products) based on their descriptions. I've got an ML
>>>>> pipeline that transforms the product data to TF-IDF representation, using
>>>>> the following components.
>>>>>
>>>>>- *RegexTokenizer* - strips out non-word characters, results in a
>>>>>list of tokens
>>>>>- *StopWordsRemover* - removes common "stop words", such as "the",
>>>>>"and", etc.
>>>>>- *HashingTF* - assigns a numeric "hash" to each token and
>>>>>calculates the term frequency
>>>>>- *IDF* - computes the inverse document frequency
>>>>>
>>>>> After this pipeline evaluates, I'm left with a SparseVector that
>>>>> represents the inverse document frequency of tokens for each product. As a
>>>>> next step, I'd like to be able to compare each vector to one another, to
>>>>> detect similarities.
>>>>>
>>>>> Does anybody know of a straightforward way to do this in Spark? I
>>>>> tried creating a UDF (that used the Breeze linear algebra methods
>>>>> internally); however, that did not scale well.
>>>>>
>>>>> Thanks,
>>>>> Kevin
>>>>>
>>>>
>>>
>


Re: write.df is failing on Spark Cluster

2016-09-20 Thread Kevin Mellott
Are you able to manually delete the folder below? I'm wondering if there is
some sort of non-Spark factor involved (permissions, etc).

/nfspartition/sankar/banking_l1_v2.csv

On Tue, Sep 20, 2016 at 12:19 PM, Sankar Mittapally <
sankar.mittapa...@creditvidya.com> wrote:

> I used that one also
>
> On Sep 20, 2016 10:44 PM, "Kevin Mellott" <kevin.r.mell...@gmail.com>
> wrote:
>
>> Instead of *mode="append"*, try *mode="overwrite"*
>>
>> On Tue, Sep 20, 2016 at 11:30 AM, Sankar Mittapally <
>> sankar.mittapa...@creditvidya.com> wrote:
>>
>>> Please find the code below.
>>>
>>> sankar2 <- read.df("/nfspartition/sankar/test/2016/08/test.json")
>>>
>>> I tried these two commands.
>>> write.df(sankar2,"/nfspartition/sankar/test/test.csv","csv",
>>> header="true")
>>>
>>> saveDF(sankar2,"sankartest.csv",source="csv",mode="append",schema="true")
>>>
>>>
>>>
>>> On Tue, Sep 20, 2016 at 9:40 PM, Kevin Mellott <
>>> kevin.r.mell...@gmail.com> wrote:
>>>
>>>> Can you please post the line of code that is doing the df.write command?
>>>>
>>>> On Tue, Sep 20, 2016 at 9:29 AM, Sankar Mittapally <
>>>> sankar.mittapa...@creditvidya.com> wrote:
>>>>
>>>>> Hey Kevin,
>>>>>
>>>>> It is a empty directory, It is able to write part files to the
>>>>> directory but while merging those part files we are getting above error.
>>>>>
>>>>> Regards
>>>>>
>>>>>
>>>>> On Tue, Sep 20, 2016 at 7:46 PM, Kevin Mellott <
>>>>> kevin.r.mell...@gmail.com> wrote:
>>>>>
>>>>>> Have you checked to see if any files already exist at
>>>>>> /nfspartition/sankar/banking_l1_v2.csv? If so, you will need to
>>>>>> delete them before attempting to save your DataFrame to that location.
>>>>>> Alternatively, you may be able to specify the "mode" setting of the
>>>>>> df.write operation to "overwrite", depending on the version of Spark you
>>>>>> are running.
>>>>>>
>>>>>> *ERROR (from log)*
>>>>>> 16/09/17 08:03:28 WARN FileUtil: Failed to delete file or
>>>>>> dir[/nfspartition/sankar/banking_l1_v2.csv/_temporary/0/task
>>>>>> _201609170802_0013_m_00/.part-r-0-46a7f178-2490-444e
>>>>>> -9110-510978eaaecb.csv.crc]:
>>>>>> it still exists.
>>>>>> 16/09/17 08:03:28 WARN FileUtil: Failed to delete file or
>>>>>> dir[/nfspartition/sankar/banking_l1_v2.csv/_temporary/0/task
>>>>>> _201609170802_0013_m_00/part-r-0-46a7f178-2490-444e-
>>>>>> 9110-510978eaaecb.csv]:
>>>>>> it still exists.
>>>>>>
>>>>>> *df.write Documentation*
>>>>>> http://spark.apache.org/docs/latest/api/R/write.df.html
>>>>>>
>>>>>> Thanks,
>>>>>> Kevin
>>>>>>
>>>>>> On Tue, Sep 20, 2016 at 12:16 AM, sankarmittapally <
>>>>>> sankar.mittapa...@creditvidya.com> wrote:
>>>>>>
>>>>>>>  We have setup a spark cluster which is on NFS shared storage, there
>>>>>>> is no
>>>>>>> permission issues with NFS storage, all the users are able to write
>>>>>>> to NFS
>>>>>>> storage. When I fired write.df command in SparkR, I am getting
>>>>>>> below. Can
>>>>>>> some one please help me to fix this issue.
>>>>>>>
>>>>>>>
>>>>>>> 16/09/17 08:03:28 ERROR InsertIntoHadoopFsRelationCommand: Aborting
>>>>>>> job.
>>>>>>> java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus
>>>>>>> {path=file:/nfspartition/sankar/banking_l1_v2.csv/_temporary
>>>>>>> /0/task_201609170802_0013_m_00/part-r-0-46a7f178-249
>>>>>>> 0-444e-9110-510978eaaecb.csv;
>>>>>>> isDirectory=false; length=436486316; replication=1;
>>>>>>> blocksize=33554432;
>>>>>>> modification_time=147409940; access_time=0; owner=; group=;
>>>>>>> permiss

Re: Similar Items

2016-09-20 Thread Kevin Mellott
Thanks Nick - those examples will help a ton!!

On Tue, Sep 20, 2016 at 12:20 PM, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> A few options include:
>
> https://github.com/marufaytekin/lsh-spark - I've used this a bit and it
> seems quite scalable too from what I've looked at.
> https://github.com/soundcloud/cosine-lsh-join-spark - not used this but
> looks like it should do exactly what you need.
> https://github.com/mrsqueeze/*spark*-hash
> <https://github.com/mrsqueeze/spark-hash>
>
>
> On Tue, 20 Sep 2016 at 18:06 Kevin Mellott <kevin.r.mell...@gmail.com>
> wrote:
>
>> Thanks for the reply, Nick! I'm typically analyzing around 30-50K
>> products at a time (as an isolated set of products). Within this set of
>> products (which represents all products for a particular supplier), I am
>> also analyzing each category separately. The largest categories typically
>> have around 10K products.
>>
>> That being said, when calculating IDFs for the 10K product set we come
>> out with roughly 12K unique tokens. In other words, our vectors are 12K
>> columns wide (although they are being represented using SparseVectors). We
>> have a step that is attempting to locate all documents that share the same
>> tokens, and for those items we will calculate the cosine similarity.
>> However, the part that attempts to identify documents with shared tokens is
>> the bottleneck.
>>
>> For this portion, we map our data down to the individual tokens contained
>> by each document. For example:
>>
>> DocumentId   |   Description
>> 
>> 
>> 1   Easton Hockey Stick
>> 2   Bauer Hockey Gloves
>>
>> In this case, we'd map to the following:
>>
>> (1, 'Easton')
>> (1, 'Hockey')
>> (1, 'Stick')
>> (2, 'Bauer')
>> (2, 'Hockey')
>> (2, 'Gloves')
>>
>> Our goal is to aggregate this data as follows; however, our code that
>> currently does this is does not perform well. In the realistic 12K product
>> scenario, this resulted in 430K document/token tuples.
>>
>> ((1, 2), ['Hockey'])
>>
>> This then tells us that documents 1 and 2 need to be compared to one
>> another (via cosine similarity) because they both contain the token
>> 'hockey'. I will investigate the methods that you recommended to see if
>> they may resolve our problem.
>>
>> Thanks,
>> Kevin
>>
>> On Tue, Sep 20, 2016 at 1:45 AM, Nick Pentreath <nick.pentre...@gmail.com
>> > wrote:
>>
>>> How many products do you have? How large are your vectors?
>>>
>>> It could be that SVD / LSA could be helpful. But if you have many
>>> products then trying to compute all-pair similarity with brute force is not
>>> going to be scalable. In this case you may want to investigate hashing
>>> (LSH) techniques.
>>>
>>>
>>> On Mon, 19 Sep 2016 at 22:49, Kevin Mellott <kevin.r.mell...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm trying to write a Spark application that will detect similar items
>>>> (in this case products) based on their descriptions. I've got an ML
>>>> pipeline that transforms the product data to TF-IDF representation, using
>>>> the following components.
>>>>
>>>>- *RegexTokenizer* - strips out non-word characters, results in a
>>>>list of tokens
>>>>- *StopWordsRemover* - removes common "stop words", such as "the",
>>>>"and", etc.
>>>>- *HashingTF* - assigns a numeric "hash" to each token and
>>>>calculates the term frequency
>>>>- *IDF* - computes the inverse document frequency
>>>>
>>>> After this pipeline evaluates, I'm left with a SparseVector that
>>>> represents the inverse document frequency of tokens for each product. As a
>>>> next step, I'd like to be able to compare each vector to one another, to
>>>> detect similarities.
>>>>
>>>> Does anybody know of a straightforward way to do this in Spark? I tried
>>>> creating a UDF (that used the Breeze linear algebra methods internally);
>>>> however, that did not scale well.
>>>>
>>>> Thanks,
>>>> Kevin
>>>>
>>>
>>


Re: write.df is failing on Spark Cluster

2016-09-20 Thread Kevin Mellott
Instead of *mode="append"*, try *mode="overwrite"*

On Tue, Sep 20, 2016 at 11:30 AM, Sankar Mittapally <
sankar.mittapa...@creditvidya.com> wrote:

> Please find the code below.
>
> sankar2 <- read.df("/nfspartition/sankar/test/2016/08/test.json")
>
> I tried these two commands.
> write.df(sankar2,"/nfspartition/sankar/test/test.csv","csv",header="true")
>
> saveDF(sankar2,"sankartest.csv",source="csv",mode="append",schema="true")
>
>
>
> On Tue, Sep 20, 2016 at 9:40 PM, Kevin Mellott <kevin.r.mell...@gmail.com>
> wrote:
>
>> Can you please post the line of code that is doing the df.write command?
>>
>> On Tue, Sep 20, 2016 at 9:29 AM, Sankar Mittapally <
>> sankar.mittapa...@creditvidya.com> wrote:
>>
>>> Hey Kevin,
>>>
>>> It is a empty directory, It is able to write part files to the directory
>>> but while merging those part files we are getting above error.
>>>
>>> Regards
>>>
>>>
>>> On Tue, Sep 20, 2016 at 7:46 PM, Kevin Mellott <
>>> kevin.r.mell...@gmail.com> wrote:
>>>
>>>> Have you checked to see if any files already exist at
>>>> /nfspartition/sankar/banking_l1_v2.csv? If so, you will need to delete
>>>> them before attempting to save your DataFrame to that location.
>>>> Alternatively, you may be able to specify the "mode" setting of the
>>>> df.write operation to "overwrite", depending on the version of Spark you
>>>> are running.
>>>>
>>>> *ERROR (from log)*
>>>> 16/09/17 08:03:28 WARN FileUtil: Failed to delete file or
>>>> dir[/nfspartition/sankar/banking_l1_v2.csv/_temporary/0/task
>>>> _201609170802_0013_m_00/.part-r-0-46a7f178-2490-444e
>>>> -9110-510978eaaecb.csv.crc]:
>>>> it still exists.
>>>> 16/09/17 08:03:28 WARN FileUtil: Failed to delete file or
>>>> dir[/nfspartition/sankar/banking_l1_v2.csv/_temporary/0/task
>>>> _201609170802_0013_m_00/part-r-0-46a7f178-2490-444e-
>>>> 9110-510978eaaecb.csv]:
>>>> it still exists.
>>>>
>>>> *df.write Documentation*
>>>> http://spark.apache.org/docs/latest/api/R/write.df.html
>>>>
>>>> Thanks,
>>>> Kevin
>>>>
>>>> On Tue, Sep 20, 2016 at 12:16 AM, sankarmittapally <
>>>> sankar.mittapa...@creditvidya.com> wrote:
>>>>
>>>>>  We have setup a spark cluster which is on NFS shared storage, there
>>>>> is no
>>>>> permission issues with NFS storage, all the users are able to write to
>>>>> NFS
>>>>> storage. When I fired write.df command in SparkR, I am getting below.
>>>>> Can
>>>>> some one please help me to fix this issue.
>>>>>
>>>>>
>>>>> 16/09/17 08:03:28 ERROR InsertIntoHadoopFsRelationCommand: Aborting
>>>>> job.
>>>>> java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus
>>>>> {path=file:/nfspartition/sankar/banking_l1_v2.csv/_temporary
>>>>> /0/task_201609170802_0013_m_00/part-r-0-46a7f178-249
>>>>> 0-444e-9110-510978eaaecb.csv;
>>>>> isDirectory=false; length=436486316; replication=1; blocksize=33554432;
>>>>> modification_time=147409940; access_time=0; owner=; group=;
>>>>> permission=rw-rw-rw-; isSymlink=false}
>>>>> to
>>>>> file:/nfspartition/sankar/banking_l1_v2.csv/part-r-0-46a
>>>>> 7f178-2490-444e-9110-510978eaaecb.csv
>>>>> at
>>>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.m
>>>>> ergePaths(FileOutputCommitter.java:371)
>>>>> at
>>>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.m
>>>>> ergePaths(FileOutputCommitter.java:384)
>>>>> at
>>>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.c
>>>>> ommitJob(FileOutputCommitter.java:326)
>>>>> at
>>>>> org.apache.spark.sql.execution.datasources.BaseWriterContain
>>>>> er.commitJob(WriterContainer.scala:222)
>>>>> at
>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>>>>> sRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoo
>>>>> pFsRelationCommand.scala:144)
>>>&g

Re: write.df is failing on Spark Cluster

2016-09-20 Thread Kevin Mellott
Can you please post the line of code that is doing the df.write command?

On Tue, Sep 20, 2016 at 9:29 AM, Sankar Mittapally <
sankar.mittapa...@creditvidya.com> wrote:

> Hey Kevin,
>
> It is a empty directory, It is able to write part files to the directory
> but while merging those part files we are getting above error.
>
> Regards
>
>
> On Tue, Sep 20, 2016 at 7:46 PM, Kevin Mellott <kevin.r.mell...@gmail.com>
> wrote:
>
>> Have you checked to see if any files already exist at
>> /nfspartition/sankar/banking_l1_v2.csv? If so, you will need to delete
>> them before attempting to save your DataFrame to that location.
>> Alternatively, you may be able to specify the "mode" setting of the
>> df.write operation to "overwrite", depending on the version of Spark you
>> are running.
>>
>> *ERROR (from log)*
>> 16/09/17 08:03:28 WARN FileUtil: Failed to delete file or
>> dir[/nfspartition/sankar/banking_l1_v2.csv/_temporary/0/task
>> _201609170802_0013_m_00/.part-r-0-46a7f178-2490-444
>> e-9110-510978eaaecb.csv.crc]:
>> it still exists.
>> 16/09/17 08:03:28 WARN FileUtil: Failed to delete file or
>> dir[/nfspartition/sankar/banking_l1_v2.csv/_temporary/0/task
>> _201609170802_0013_m_00/part-r-0-46a7f178-2490-444
>> e-9110-510978eaaecb.csv]:
>> it still exists.
>>
>> *df.write Documentation*
>> http://spark.apache.org/docs/latest/api/R/write.df.html
>>
>> Thanks,
>> Kevin
>>
>> On Tue, Sep 20, 2016 at 12:16 AM, sankarmittapally <
>> sankar.mittapa...@creditvidya.com> wrote:
>>
>>>  We have setup a spark cluster which is on NFS shared storage, there is
>>> no
>>> permission issues with NFS storage, all the users are able to write to
>>> NFS
>>> storage. When I fired write.df command in SparkR, I am getting below. Can
>>> some one please help me to fix this issue.
>>>
>>>
>>> 16/09/17 08:03:28 ERROR InsertIntoHadoopFsRelationCommand: Aborting job.
>>> java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus
>>> {path=file:/nfspartition/sankar/banking_l1_v2.csv/_temporary
>>> /0/task_201609170802_0013_m_00/part-r-0-46a7f178-249
>>> 0-444e-9110-510978eaaecb.csv;
>>> isDirectory=false; length=436486316; replication=1; blocksize=33554432;
>>> modification_time=147409940; access_time=0; owner=; group=;
>>> permission=rw-rw-rw-; isSymlink=false}
>>> to
>>> file:/nfspartition/sankar/banking_l1_v2.csv/part-r-0-46a
>>> 7f178-2490-444e-9110-510978eaaecb.csv
>>> at
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.m
>>> ergePaths(FileOutputCommitter.java:371)
>>> at
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.m
>>> ergePaths(FileOutputCommitter.java:384)
>>> at
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.c
>>> ommitJob(FileOutputCommitter.java:326)
>>> at
>>> org.apache.spark.sql.execution.datasources.BaseWriterContain
>>> er.commitJob(WriterContainer.scala:222)
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>>> sRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoo
>>> pFsRelationCommand.scala:144)
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>>> sRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRela
>>> tionCommand.scala:115)
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>>> sRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRela
>>> tionCommand.scala:115)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutio
>>> nId(SQLExecution.scala:57)
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>>> sRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
>>> at
>>> org.apache.spark.sql.execution.command.ExecutedCommandExec.s
>>> ideEffectResult$lzycompute(commands.scala:60)
>>> at
>>> org.apache.spark.sql.execution.command.ExecutedCommandExec.s
>>> ideEffectResult(commands.scala:58)
>>> at
>>> org.apache.spark.sql.execution.command.ExecutedCommandExec.d
>>> oExecute(commands.scala:74)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.
>>> apply(SparkPlan.scala:115)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.
>>> apply(SparkPlan.scala:115)
&

Re: write.df is failing on Spark Cluster

2016-09-20 Thread Kevin Mellott
Have you checked to see if any files already exist at /nfspartition/sankar/
banking_l1_v2.csv? If so, you will need to delete them before attempting to
save your DataFrame to that location. Alternatively, you may be able to
specify the "mode" setting of the df.write operation to "overwrite",
depending on the version of Spark you are running.

*ERROR (from log)*
16/09/17 08:03:28 WARN FileUtil: Failed to delete file or
dir[/nfspartition/sankar/banking_l1_v2.csv/_temporary/
0/task_201609170802_0013_m_00/.part-r-0-46a7f178-
2490-444e-9110-510978eaaecb.csv.crc]:
it still exists.
16/09/17 08:03:28 WARN FileUtil: Failed to delete file or
dir[/nfspartition/sankar/banking_l1_v2.csv/_temporary/
0/task_201609170802_0013_m_00/part-r-0-46a7f178-
2490-444e-9110-510978eaaecb.csv]:
it still exists.

*df.write Documentation*
http://spark.apache.org/docs/latest/api/R/write.df.html

Thanks,
Kevin

On Tue, Sep 20, 2016 at 12:16 AM, sankarmittapally <
sankar.mittapa...@creditvidya.com> wrote:

>  We have setup a spark cluster which is on NFS shared storage, there is no
> permission issues with NFS storage, all the users are able to write to NFS
> storage. When I fired write.df command in SparkR, I am getting below. Can
> some one please help me to fix this issue.
>
>
> 16/09/17 08:03:28 ERROR InsertIntoHadoopFsRelationCommand: Aborting job.
> java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus
> {path=file:/nfspartition/sankar/banking_l1_v2.csv/_
> temporary/0/task_201609170802_0013_m_00/part-r-0-
> 46a7f178-2490-444e-9110-510978eaaecb.csv;
> isDirectory=false; length=436486316; replication=1; blocksize=33554432;
> modification_time=147409940; access_time=0; owner=; group=;
> permission=rw-rw-rw-; isSymlink=false}
> to
> file:/nfspartition/sankar/banking_l1_v2.csv/part-r-
> 0-46a7f178-2490-444e-9110-510978eaaecb.csv
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(
> FileOutputCommitter.java:371)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(
> FileOutputCommitter.java:384)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(
> FileOutputCommitter.java:326)
> at
> org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(
> WriterContainer.scala:222)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationComm
> and$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationComm
> and.scala:144)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationComm
> and$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationComm
> and$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:57)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationComm
> and.run(InsertIntoHadoopFsRelationCommand.scala:115)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.
> sideEffectResult$lzycompute(commands.scala:60)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.
> sideEffectResult(commands.scala:58)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(
> commands.scala:74)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:115)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(
> SparkPlan.scala:136)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(
> QueryExecution.scala:86)
> at
> org.apache.spark.sql.execution.QueryExecution.
> toRdd(QueryExecution.scala:86)
> at
> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.
> scala:487)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.api.r.RBackendHandler.handleMethodCall(
> RBackendHandler.scala:141)
> at
> org.apache.spark.api.r.RBackendHandler.channelRead0(
> RBackendHandler.scala:86)
> at
> org.apache.spark.api.r.RBackendHandler.channelRead0(
> RBackendHandler.scala:38)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
> at
> 

Similar Items

2016-09-19 Thread Kevin Mellott
Hi all,

I'm trying to write a Spark application that will detect similar items (in
this case products) based on their descriptions. I've got an ML pipeline
that transforms the product data to TF-IDF representation, using the
following components.

   - *RegexTokenizer* - strips out non-word characters, results in a list
   of tokens
   - *StopWordsRemover* - removes common "stop words", such as "the",
   "and", etc.
   - *HashingTF* - assigns a numeric "hash" to each token and calculates
   the term frequency
   - *IDF* - computes the inverse document frequency

After this pipeline evaluates, I'm left with a SparseVector that represents
the inverse document frequency of tokens for each product. As a next step,
I'd like to be able to compare each vector to one another, to detect
similarities.

Does anybody know of a straightforward way to do this in Spark? I tried
creating a UDF (that used the Breeze linear algebra methods internally);
however, that did not scale well.

Thanks,
Kevin


Re: study materials for operators on Dataframe

2016-09-19 Thread Kevin Mellott
I would recommend signing up for a Databricks Community Edition account. It
will give you access to a 6GB cluster, with many different example programs
that you can use to get started.

https://databricks.com/try-databricks

If you are looking for a more formal training method, I just completed the
EDX course linked below. The lecture videos were provided by UC-Berkeley
professors, and the labs are all run on Databricks. The classes are no
longer active (so no professor interactions); however, you can still access
all of the lectures and labs for free.

https://courses.edx.org/dashboard/programs/21/data-science-and-engineering-with-spark


PS: I am not in any way associated with Databricks, I just happen to find
their product extremely useful (especially for training purposes).

On Sun, Sep 18, 2016 at 9:41 PM, 颜发才(Yan Facai)  wrote:

> Hi,
> I am a newbie,
> and the official document of spark is too concise for me, especially the
> introduction of operators on dataframe.
>
> For python, pandas gives a very detailed document: [Pandas](
> http://pandas.pydata.org/pandas-docs/stable/index.html)
> so,
> does anyone know some sites or cookbooks which are more helpful for newbie?
>
> Thanks.
>


Re: driver OOM - need recommended memory for driver

2016-09-19 Thread Kevin Mellott
Hi Anand,

Unfortunately, there is not really a "one size fits all" answer to this
question; however, here are some things that you may want to consider when
trying different sizes.

   - What is the size of the data you are processing?
   - Whenever you invoke an action that requires ALL of the data to be sent
   to the driver (such as collect), you'll need to ensure that your memory
   setting can handle it.
   - What level of parallelization does your code support? The more
   processing you can do on the worker nodes, the less your driver will need
   to do.

Related to these comments, keep in mind that the --executor-memory,
--num-executors, and --executor-cores configurations can be useful when
tuning the worker nodes. There is some great information in the Spark
Tuning Guide (linked below) that you may find useful as well.

http://spark.apache.org/docs/latest/tuning.html

Hope that helps!
Kevin

On Mon, Sep 19, 2016 at 9:32 AM, Anand Viswanathan <
anand_v...@ymail.com.invalid> wrote:

> Hi,
>
> Spark version :spark-1.5.2-bin-hadoop2.6 ,using pyspark.
>
> I am running a machine learning program, which runs perfectly by
> specifying 2G for —driver-memory.
> However the program cannot be run with default 1G, driver crashes with OOM
> error.
>
> What is the recommended configuration for —driver-memory…? Please suggest.
>
> Thanks and regards,
> Anand.
>
>


Re: Alternative to groupByKey() + mapValues() for non-commutative, non-associative aggregate?

2016-05-03 Thread Kevin Mellott
If you put this into a dataframe then you may be able to use one hot
encoding and treat these as categorical features. I believe that the ml
pipeline components use project tungsten so the performance will be very
fast. After you process the result on the dataframe you would then need to
assemble your desired format.
On May 3, 2016 4:29 PM, "Bibudh Lahiri"  wrote:

> Hi,
>   I have multiple procedure codes that a patient has undergone, in an RDD
> with a different row for each combination of patient and procedure. I am
> trying to covert this data to the LibSVM format, so that the resultant
> looks as follows:
>
>   "0 1:1 2:0 3:1 29:1 30:1 32:1 110:1"
>
>   where 1, 2, 3, 29, 30, 32, 110 are numeric procedure codes a given
> patient has undergone. Note that Spark needs these codes to be one-based
> and in ascending order, so I am using a combination of groupByKey() and
> mapValues() to do this conversion as follows:
>
> procedures_rdd = procedures_rdd.groupByKey().mapValues(combine_procedures)
>
> where combine_procedures() is defined as:
>
> def combine_procedures(l_procs):
>   ascii_procs = map(lambda x: int(custom_encode(x)), l_procs)
>   return ' '.join([str(x) + ":1" for x in sorted(ascii_procs)])
>
>   Note that this reduction is neither commutative nor associative, since
> combining "29:1 30:1" and "32:1 110:1" to "32:1 110:1 29:1 30:1" is not
> going to work.
>   Can someone suggest some faster alternative to the combination
> of groupByKey() and mapValues() for this?
>
> Thanks
>Bibudh
>
>
> --
> Bibudh Lahiri
> Senior Data Scientist, Impetus Technolgoies
> 720 University Avenue, Suite 130
> Los Gatos, CA 95129
> http://knowthynumbers.blogspot.com/
>
>


Re: How to convert Parquet file to a text file.

2016-03-15 Thread Kevin Mellott
I'd recommend reading the parquet file into a DataFrame object, and then
using spark-csv  to write to a CSV
file.

On Tue, Mar 15, 2016 at 3:34 PM, Shishir Anshuman  wrote:

> I need to convert the parquet file generated by the spark to a text (csv
> preferably) file. I want to use the data model outside spark.
>
> Any suggestion on how to proceed?
>


Re: RDD recomputation

2016-03-10 Thread Kevin Mellott
I've had very good success troubleshooting this type of thing by using the
Spark Web UI, which will depict a breakdown of all tasks. This also
includes the RDDs being used, as well as any cached data. Additional
information about this tool can be found at
http://spark.apache.org/docs/latest/monitoring.html.

On Thu, Mar 10, 2016 at 1:31 PM, souri datta 
wrote:

> Hi,
>  Currently I am trying to optimize my spark application and in that
> process, I am trying to figure out if at any stage in the code, I am
> recomputing a large RDD (so that I can optimize it by
> persisting/checkpointing it).
>
> Is there any indication in the event logs that tells us about an RDD being
> computed?
> If anyone has done similar analysis, can you please share how you went
> about it.
>
> Thanks in advance,
> Souri
>


Re: how to implement ALS with csv file? getting error while calling Rating class

2016-03-07 Thread Kevin Mellott
If you are using DataFrames, then you also can specify the schema when
loading as an alternate solution. I've found Spark-CSV
 to be a very useful library when
working with CSV data.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader


On Mon, Mar 7, 2016 at 1:10 AM, Nick Pentreath 
wrote:

> As you've pointed out, Rating requires user and item ids in Int form. So
> you will need to map String user ids to integers.
>
> See this thread for example:
> https://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAJgQjQ9GhGqpg1=hvxpfrs+59elfj9f7knhp8nyqnh1ut_6...@mail.gmail.com%3E
> .
>
> There is a DeveloperApi method
> in org.apache.spark.ml.recommendation.ALS that takes Rating with generic
> type (can be String) for user id and item id. However that is a little more
> involved, and for larger scale data will be a lot less efficient.
>
> Something like this for example:
>
> import org.apache.spark.ml.recommendation.ALS
> import org.apache.spark.ml.recommendation.ALS.Rating
>
> val conf = new SparkConf().setAppName("ALSWithStringID").setMaster("local[4]")
> val sc = new SparkContext(conf)
> // Name,Value1,Value2.
> val rdd = sc.parallelize(Seq(
>   Rating[String]("foo", "1", 4.0f),
>   Rating[String]("foo", "2", 2.0f),
>   Rating[String]("bar", "1", 5.0f),
>   Rating[String]("bar", "3", 1.0f)
> ))
> val (userFactors, itemFactors) = ALS.train(rdd)
>
>
> As you can see, you just get the factor RDDs back, and if you want an
> ALSModel you will have to construct it yourself.
>
>
> On Sun, 6 Mar 2016 at 18:23 Shishir Anshuman 
> wrote:
>
>> I am new to apache Spark, and I want to implement the Alternating Least
>> Squares algorithm. The data set is stored in a csv file in the format:
>> *Name,Value1,Value2*.
>>
>> When I read the csv file, I get
>> *java.lang.NumberFormatException.forInputString* error because the
>> Rating class needs the parameters in the format: *(user: Int, product:
>> Int, rating: Double)* and the first column of my file contains *Name*.
>>
>> Please suggest me a way to overcome this issue.
>>
>


Re: Flattening Data within DataFrames

2016-02-29 Thread Kevin Mellott
Thanks Michal - this is exactly what I need.

On Mon, Feb 29, 2016 at 11:40 AM, Michał Zieliński <
zielinski.mich...@gmail.com> wrote:

> Hi Kevin,
>
> This should help:
>
> https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-spark.html
>
> On 29 February 2016 at 16:54, Kevin Mellott <kevin.r.mell...@gmail.com>
> wrote:
>
>> Fellow Sparkers,
>>
>> I'm trying to "flatten" my view of data within a DataFrame, and am having
>> difficulties doing so. The DataFrame contains product information, which
>> includes multiple levels of categories (primary, secondary, etc).
>>
>> *Example Data (Raw):*
>> *NameLevelCategory*
>> Baked CodeFood 1
>> Baked CodeSeafood 2
>> Baked CodeFish   3
>> Hockey Stick  Sports1
>> Hockey Stick  Hockey  2
>> Hockey Stick  Equipment  3
>>
>> *Desired Data:*
>> *NameCategory1 Category2 Category3*
>> Baked CodeFood  Seafood Fish
>> Hockey Stick  SportsHockey  Equipment
>>
>> *Approach:*
>> After parsing the "raw" information into two separate DataFrames (called 
>> *products
>> *and *categories*) and registering them as a Spark SQL tables, I was
>> attempting to perform the following query to flatten this all into the
>> "desired data" (depicted above).
>>
>> products.registerTempTable("products")
>> categories.registerTempTable("categories")
>>
>> val productList = sqlContext.sql(
>>   " SELECT p.Name, " +
>>   " c1.Description AS Category1, " +
>>   " c2.Description AS Category2, " +
>>   " c3.Description AS Category3 " +
>>   " FROM products AS p " +
>>   "   JOIN categories AS c1 " +
>>   " ON c1.Name = p.Name AND c1.Level = '1' "
>>   "   JOIN categories AS c2 " +
>>   " ON c2.Name = p.Name AND c2.Level = '2' "
>>   "   JOIN categories AS c3 " +
>>   " ON c3.Name = p.Name AND c3.Level = '3' "
>>
>> *Issue:*
>> I get an error when running my query above, because I am not able to JOIN
>> the *categories* table more than once. Has anybody dealt with this type
>> of use case before, and if so how did you achieve the desired behavior?
>>
>> Thank you in advance for your thoughts.
>>
>> Kevin
>>
>
>


Flattening Data within DataFrames

2016-02-29 Thread Kevin Mellott
Fellow Sparkers,

I'm trying to "flatten" my view of data within a DataFrame, and am having
difficulties doing so. The DataFrame contains product information, which
includes multiple levels of categories (primary, secondary, etc).

*Example Data (Raw):*
*NameLevelCategory*
Baked CodeFood 1
Baked CodeSeafood 2
Baked CodeFish   3
Hockey Stick  Sports1
Hockey Stick  Hockey  2
Hockey Stick  Equipment  3

*Desired Data:*
*NameCategory1 Category2 Category3*
Baked CodeFood  Seafood Fish
Hockey Stick  SportsHockey  Equipment

*Approach:*
After parsing the "raw" information into two separate DataFrames
(called *products
*and *categories*) and registering them as a Spark SQL tables, I was
attempting to perform the following query to flatten this all into the
"desired data" (depicted above).

products.registerTempTable("products")
categories.registerTempTable("categories")

val productList = sqlContext.sql(
  " SELECT p.Name, " +
  " c1.Description AS Category1, " +
  " c2.Description AS Category2, " +
  " c3.Description AS Category3 " +
  " FROM products AS p " +
  "   JOIN categories AS c1 " +
  " ON c1.Name = p.Name AND c1.Level = '1' "
  "   JOIN categories AS c2 " +
  " ON c2.Name = p.Name AND c2.Level = '2' "
  "   JOIN categories AS c3 " +
  " ON c3.Name = p.Name AND c3.Level = '3' "

*Issue:*
I get an error when running my query above, because I am not able to JOIN
the *categories* table more than once. Has anybody dealt with this type of
use case before, and if so how did you achieve the desired behavior?

Thank you in advance for your thoughts.

Kevin


Re: [MLlib] How to set Loss to Gradient Boosted Tree in Java

2016-02-29 Thread Kevin Mellott
I found a helper class that I think should do the trick. Take a look at
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Losses.scala

When passing the Loss, you should be able to do something like:

Losses.fromString("leastSquaresError")

On Mon, Feb 29, 2016 at 10:03 AM, diplomatic Guru <diplomaticg...@gmail.com>
wrote:

> It's strange as you are correct the doc does state it. But it's
> complaining about the constructor.
>
> When I clicked on the org.apache.spark.mllib.tree.loss.AbsoluteError
> class, this is what I see:
>
>
> @Since("1.2.0")
> @DeveloperApi
> object AbsoluteError extends Loss {
>
>   /**
>* Method to calculate the gradients for the gradient boosting
> calculation for least
>* absolute error calculation.
>* The gradient with respect to F(x) is: sign(F(x) - y)
>* @param prediction Predicted label.
>* @param label True label.
>* @return Loss gradient
>*/
>   @Since("1.2.0")
>   override def gradient(prediction: Double, label: Double): Double = {
> if (label - prediction < 0) 1.0 else -1.0
>   }
>
>   override private[mllib] def computeError(prediction: Double, label:
> Double): Double = {
> val err = label - prediction
> math.abs(err)
>   }
> }
>
>
> On 29 February 2016 at 15:49, Kevin Mellott <kevin.r.mell...@gmail.com>
> wrote:
>
>> Looks like it should be present in 1.3 at
>> org.apache.spark.mllib.tree.loss.AbsoluteError
>>
>>
>> spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/tree/loss/AbsoluteError.html
>>
>> On Mon, Feb 29, 2016 at 9:46 AM, diplomatic Guru <
>> diplomaticg...@gmail.com> wrote:
>>
>>> AbsoluteError() constructor is undefined.
>>>
>>> I'm using Spark 1.3.0, maybe it is not ready for this version?
>>>
>>>
>>>
>>> On 29 February 2016 at 15:38, Kevin Mellott <kevin.r.mell...@gmail.com>
>>> wrote:
>>>
>>>> I believe that you can instantiate an instance of the AbsoluteError
>>>> class for the *Loss* object, since that object implements the Loss
>>>> interface. For example.
>>>>
>>>> val loss = new AbsoluteError()
>>>> boostingStrategy.setLoss(loss)
>>>>
>>>> On Mon, Feb 29, 2016 at 9:33 AM, diplomatic Guru <
>>>> diplomaticg...@gmail.com> wrote:
>>>>
>>>>> Hi Kevin,
>>>>>
>>>>> Yes, I've set the bootingStrategy like that using the example. But I'm
>>>>> not sure how to create and pass the Loss object.
>>>>>
>>>>> e.g
>>>>>
>>>>> boostingStrategy.setLoss(..);
>>>>>
>>>>> Not sure how to pass the selected Loss.
>>>>>
>>>>> How do I set the  Absolute Error in setLoss() function?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 29 February 2016 at 15:26, Kevin Mellott <kevin.r.mell...@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> You can use the constructor that accepts a BoostingStrategy object,
>>>>>> which will allow you to set the tree strategy (and other hyperparameters 
>>>>>> as
>>>>>> well).
>>>>>>
>>>>>> *GradientBoostedTrees
>>>>>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/GradientBoostedTrees.html#GradientBoostedTrees(org.apache.spark.mllib.tree.configuration.BoostingStrategy)>*
>>>>>> (BoostingStrategy
>>>>>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/configuration/BoostingStrategy.html>
>>>>>>  boostingStrategy)
>>>>>>
>>>>>> On Mon, Feb 29, 2016 at 9:21 AM, diplomatic Guru <
>>>>>> diplomaticg...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello guys,
>>>>>>>
>>>>>>> I think the default Loss algorithm is Squared Error for regression,
>>>>>>> but how do I change that to Absolute Error in Java.
>>>>>>>
>>>>>>> Could you please show me an example?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: [MLlib] How to set Loss to Gradient Boosted Tree in Java

2016-02-29 Thread Kevin Mellott
Looks like it should be present in 1.3 at
org.apache.spark.mllib.tree.loss.AbsoluteError

spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/tree/loss/AbsoluteError.html

On Mon, Feb 29, 2016 at 9:46 AM, diplomatic Guru <diplomaticg...@gmail.com>
wrote:

> AbsoluteError() constructor is undefined.
>
> I'm using Spark 1.3.0, maybe it is not ready for this version?
>
>
>
> On 29 February 2016 at 15:38, Kevin Mellott <kevin.r.mell...@gmail.com>
> wrote:
>
>> I believe that you can instantiate an instance of the AbsoluteError class
>> for the *Loss* object, since that object implements the Loss interface.
>> For example.
>>
>> val loss = new AbsoluteError()
>> boostingStrategy.setLoss(loss)
>>
>> On Mon, Feb 29, 2016 at 9:33 AM, diplomatic Guru <
>> diplomaticg...@gmail.com> wrote:
>>
>>> Hi Kevin,
>>>
>>> Yes, I've set the bootingStrategy like that using the example. But I'm
>>> not sure how to create and pass the Loss object.
>>>
>>> e.g
>>>
>>> boostingStrategy.setLoss(..);
>>>
>>> Not sure how to pass the selected Loss.
>>>
>>> How do I set the  Absolute Error in setLoss() function?
>>>
>>>
>>>
>>>
>>> On 29 February 2016 at 15:26, Kevin Mellott <kevin.r.mell...@gmail.com>
>>> wrote:
>>>
>>>> You can use the constructor that accepts a BoostingStrategy object,
>>>> which will allow you to set the tree strategy (and other hyperparameters as
>>>> well).
>>>>
>>>> *GradientBoostedTrees
>>>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/GradientBoostedTrees.html#GradientBoostedTrees(org.apache.spark.mllib.tree.configuration.BoostingStrategy)>*
>>>> (BoostingStrategy
>>>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/configuration/BoostingStrategy.html>
>>>>  boostingStrategy)
>>>>
>>>> On Mon, Feb 29, 2016 at 9:21 AM, diplomatic Guru <
>>>> diplomaticg...@gmail.com> wrote:
>>>>
>>>>> Hello guys,
>>>>>
>>>>> I think the default Loss algorithm is Squared Error for regression,
>>>>> but how do I change that to Absolute Error in Java.
>>>>>
>>>>> Could you please show me an example?
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: [MLlib] How to set Loss to Gradient Boosted Tree in Java

2016-02-29 Thread Kevin Mellott
I believe that you can instantiate an instance of the AbsoluteError class
for the *Loss* object, since that object implements the Loss interface. For
example.

val loss = new AbsoluteError()
boostingStrategy.setLoss(loss)

On Mon, Feb 29, 2016 at 9:33 AM, diplomatic Guru <diplomaticg...@gmail.com>
wrote:

> Hi Kevin,
>
> Yes, I've set the bootingStrategy like that using the example. But I'm not
> sure how to create and pass the Loss object.
>
> e.g
>
> boostingStrategy.setLoss(..);
>
> Not sure how to pass the selected Loss.
>
> How do I set the  Absolute Error in setLoss() function?
>
>
>
>
> On 29 February 2016 at 15:26, Kevin Mellott <kevin.r.mell...@gmail.com>
> wrote:
>
>> You can use the constructor that accepts a BoostingStrategy object, which
>> will allow you to set the tree strategy (and other hyperparameters as well).
>>
>> *GradientBoostedTrees
>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/GradientBoostedTrees.html#GradientBoostedTrees(org.apache.spark.mllib.tree.configuration.BoostingStrategy)>*
>> (BoostingStrategy
>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/tree/configuration/BoostingStrategy.html>
>>  boostingStrategy)
>>
>> On Mon, Feb 29, 2016 at 9:21 AM, diplomatic Guru <
>> diplomaticg...@gmail.com> wrote:
>>
>>> Hello guys,
>>>
>>> I think the default Loss algorithm is Squared Error for regression, but
>>> how do I change that to Absolute Error in Java.
>>>
>>> Could you please show me an example?
>>>
>>>
>>>
>>
>


Re: [MLlib] How to set Loss to Gradient Boosted Tree in Java

2016-02-29 Thread Kevin Mellott
You can use the constructor that accepts a BoostingStrategy object, which
will allow you to set the tree strategy (and other hyperparameters as well).

*GradientBoostedTrees
*
(BoostingStrategy

 boostingStrategy)

On Mon, Feb 29, 2016 at 9:21 AM, diplomatic Guru 
wrote:

> Hello guys,
>
> I think the default Loss algorithm is Squared Error for regression, but
> how do I change that to Absolute Error in Java.
>
> Could you please show me an example?
>
>
>


Re: Spark SQL partitioned tables - check for partition

2016-02-25 Thread Kevin Mellott
If you want to see which partitions exist on disk (without manually
checking), you could write code against the Hadoop FileSystem library to
check. Is that what you are asking?

https://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/fs/package-summary.html


On Thu, Feb 25, 2016 at 10:54 AM, Deenar Toraskar <deenar.toras...@gmail.com
> wrote:

> Kevin
>
> I meant the partitions on disk/hdfs not the inmemory RDD/Dataframe
> partitions. If I am right mapPartitions or forEachPartitions would identify
> and operate on the in memory partitions.
>
> Deenar
>
> On 25 February 2016 at 15:28, Kevin Mellott <kevin.r.mell...@gmail.com>
> wrote:
>
>> Once you have loaded information into a DataFrame, you can use the 
>> *mapPartitionsi
>> or forEachPartition *operations to both identify the partitions and
>> operate against them.
>>
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame
>>
>>
>> On Thu, Feb 25, 2016 at 9:24 AM, Deenar Toraskar <
>> deenar.toras...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> How does one check for the presence of a partition in a Spark SQL
>>> partitioned table (save using dataframe.write.partitionedBy("partCol") not
>>> hive compatible tables), other than physically checking the directory on
>>> HDFS or doing a count(*)  with the partition cols in the where clause ?
>>>
>>>
>>> Regards
>>> Deenar
>>>
>>
>>
>


Re: Spark SQL partitioned tables - check for partition

2016-02-25 Thread Kevin Mellott
Once you have loaded information into a DataFrame, you can use the
*mapPartitionsi
or forEachPartition *operations to both identify the partitions and operate
against them.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame


On Thu, Feb 25, 2016 at 9:24 AM, Deenar Toraskar 
wrote:

> Hi
>
> How does one check for the presence of a partition in a Spark SQL
> partitioned table (save using dataframe.write.partitionedBy("partCol") not
> hive compatible tables), other than physically checking the directory on
> HDFS or doing a count(*)  with the partition cols in the where clause ?
>
>
> Regards
> Deenar
>


Re: Network Spark Streaming from multiple remote hosts

2016-02-23 Thread Kevin Mellott
Hi Vinti,

That example is (in my opinion) more of a tutorial and not necessarily the
way you'd want to set it up for a "real world" application. I'd recommend
using something like Apache Kafka, which will allow the various hosts to
publish messages to a queue. Your Spark Streaming application is then
receiving messages from the queue and performing whatever processing you'd
like.

http://kafka.apache.org/documentation.html#introduction

Thanks,
Kevin

On Tue, Feb 23, 2016 at 3:13 PM, Vinti Maheshwari 
wrote:

> Hi All
>
> I wrote program for Spark Streaming in Scala. In my program, i passed
> 'remote-host' and 'remote port' under socketTextStream.
>
> And in the remote machine, i have one perl script who is calling system
> command:
>
> echo 'data_str' | nc  <>
>
> In that way, my spark program is able to get data, but it seems little bit
> confusing as i have multiple remote machines which needs to send data to
> spark machine. I wanted to know the right way of doing it. Infact, how will
> i deal with data coming from multiple hosts?
>
> For Reference, My current program:
>
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("HBaseStream")
> val sc = new SparkContext(conf)
>
> val ssc = new StreamingContext(sc, Seconds(2))
>
> val inputStream = ssc.socketTextStream(, )
> ---
> ---
>
> ssc.start()
> // Wait for the computation to terminate
> ssc.awaitTermination()
>
>   }}
>
> Thanks in advance.
>
> Regards,
> ~Vinti
>


Re: How to get progress information of an RDD operation

2016-02-23 Thread Kevin Mellott
Have you considered using the Spark Web UI to view progress on your job? It
does a very good job showing the progress of the overall job, as well as
allows you to drill into the individual tasks and server activity.

On Tue, Feb 23, 2016 at 12:53 PM, Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com> wrote:

> How can I get progress information of a RDD operation? For example
>
>
>
> *val *lines = sc.textFile(*"c:/temp/input.txt"*)  // a RDD of millions of
> line
> lines.foreach(line => {
> handleLine(line)
> })
>
> The input.txt contains millions of lines. The entire operation take 6
> hours. I want to print out how many lines are processed every 1 minute so
> user know the progress. How can I do that?
>
>
>
> One way I am thinking of is to use accumulator, e.g.
>
>
>
>
>
> *val *lines = sc.textFile(*"c:/temp/input.txt"*)
> *val *acCount = sc.accumulator(0L)
> lines.foreach(line => {
> handleLine(line)
> acCount += 1
> }
>
> However how can I print out account every 1 minutes?
>
>
>
>
>
> Ningjun
>
>
>


Re: Using functional programming rather than SQL

2016-02-22 Thread Kevin Mellott
In your example, the *rs* instance should be a DataFrame object. In other
words, the result of *HiveContext.sql* is a DataFrame that you can
manipulate using *filter, map, *etc.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext


On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

> Hi,
>
> I have data stored in Hive tables that I want to do simple manipulation.
>
> Currently in Spark I perform the following with getting the result set
> using SQL from Hive tables, registering as a temporary table in Spark
>
> Now Ideally I can get the result set into a DF and work on DF to slice and
> dice the data using functional programming with filter, map. split etc.
>
> I wanted to get some ideas on how to go about it.
>
> thanks
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
> SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND   s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> *rs.registerTempTable("tmp")*
>
>
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL
> """).collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC
> """).collect.foreach(println)
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Cloud Technology Partners 
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
> the responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Cloud Technology partners Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>
>
>


Re: unsubscribe email

2016-02-01 Thread Kevin Mellott
Take a look at the first section on http://spark.apache.org/community.html.
You basically just need to send an email from the aliased email to
user-unsubscr...@spark.apache.org. If you cannot log into that email
directly, then I'd recommend using a mail client that allows for the
"send-as" functionality (such as Gmail
).

On Mon, Feb 1, 2016 at 4:38 PM, Eduardo Costa Alfaia  wrote:

> Hi Guys,
> How could I unsubscribe the email e.costaalf...@studenti.unibs.it, that
> is an alias from my email e.costaalf...@unibs.it and it is registered in
> the mail list .
>
> Thanks
>
> *Eduardo Costa Alfaia*
> *PhD Student Telecommunication Engineering*
> *Università degli Studi di Brescia-UNIBS*
>
>
> Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: how to covert millisecond time to SQL timeStamp

2016-02-01 Thread Kevin Mellott
I've had pretty good success using Joda-Time
 for date/time manipulations
within Spark applications. You may be able to use the *DateTIme* constructor
below, if you are starting with milliseconds.

DateTime

public DateTime(long instant)

Constructs an instance set to the milliseconds from 1970-01-01T00:00:00Z
using ISOChronology in the default time zone.
Parameters:instant - the milliseconds from 1970-01-01T00:00:00Z

On Mon, Feb 1, 2016 at 5:51 PM, Andy Davidson  wrote:

> What little I know about working with timestamps is based on
> https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html
>
> Using the example of dates formatted into human friend strings ->
> timeStamps I was able to figure out how to convert Epoch times to
> timestamps. The same trick did not work for millisecond times.
>
> Any suggestions would be greatly appreciated.
>
>
> Andy
>
> Working with epoch times
> 
>
> ref: http://www.epochconverter.com/
>
> Epoch timestamp: 1456050620
>
> Timestamp in milliseconds: 145605062
>
> Human time (GMT): Sun, 21 Feb 2016 10:30:20 GMT
>
> Human time (your time zone): 2/21/2016, 2:30:20 AM
>
>
> # Epoch time stamp example
>
> data = [
>
>   ("1456050620", "1456050621", 1),
>
>   ("1456050622", "14560506203", 2),
>
>   ("14560506204", "14560506205", 3)]
>
> df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
>
> ​
>
> # convert epoch time strings in to spark timestamps
>
> df = df.select(
>
>   df.start_time.cast("long").alias("start_time"),
>
>   df.end_time.cast("long").alias("end_time"),
>
>   df.id)
>
> df.printSchema()
>
> df.show(truncate=False)
>
> ​
>
> # convert longs to timestamps
>
> df = df.select(
>
>   df.start_time.cast("timestamp").alias("start_time"),
>
>   df.end_time.cast("timestamp").alias("end_time"),
>
>   df.id)
>
> df.printSchema()
>
> df.show(truncate=False)
>
> ​
>
> root
>  |-- start_time: long (nullable = true)
>  |-- end_time: long (nullable = true)
>  |-- id: long (nullable = true)
>
> +---+---+---+
> |start_time |end_time   |id |
> +---+---+---+
> |1456050620 |1456050621 |1  |
> |1456050622 |14560506203|2  |
> |14560506204|14560506205|3  |
> +---+---+---+
>
> root
>  |-- start_time: timestamp (nullable = true)
>  |-- end_time: timestamp (nullable = true)
>  |-- id: long (nullable = true)
>
> +-+-+---+
> |start_time   |end_time |id |
> +-+-+---+
> |2016-02-21 02:30:20.0|2016-02-21 02:30:21.0|1  |
> |2016-02-21 02:30:22.0|2431-05-28 02:03:23.0|2  |
> |2431-05-28 02:03:24.0|2431-05-28 02:03:25.0|3  |
> +-+-+---+
>
>
> In [21]:
>
> # working with millisecond times
>
> data = [
>
>   ("145605062", "145605062", 1)]
>
>   df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
>
> ​
>
> # convert epoch time strings in to spark timestamps
>
> df = df.select(
>
>   df.start_time.cast("long").alias("start_time"),
>
>   df.end_time.cast("long").alias("end_time"),
>
>   df.id)
>
> df.printSchema()
>
> df.show(truncate=False)
>
> ​
>
> # convert longs to timestamps
>
> df = df.select(
>
>   df.start_time.cast("timestamp").alias("start_time"),
>
>   df.end_time.cast("timestamp").alias("end_time"),
>
>   df.id)
>
> df.printSchema()
>
> df.show(truncate=False)
>
> root
>  |-- start_time: long (nullable = true)
>  |-- end_time: long (nullable = true)
>  |-- id: long (nullable = true)
>
> +-+-+---+
> |start_time   |end_time |id |
> +-+-+---+
> |145605062|145605062|1  |
> +-+-+---+
>
> root
>  |-- start_time: timestamp (nullable = true)
>  |-- end_time: timestamp (nullable = true)
>  |-- id: long (nullable = true)
>
> +--+--+---+
> |start_time|end_time  |id |
> +--+--+---+
> |48110-05-29 10:33:20.0|48110-05-29 10:33:20.0|1  |
> +--+--+---+
>
>
>


Re: Spark Distribution of Small Dataset

2016-01-28 Thread Kevin Mellott
Hi Phil,

The short answer is that there is a driver machine (which handles the
distribution of tasks and data) and a number of worker nodes (which receive
data and perform the actual tasks). That being said, certain tasks need to
be performed on the driver, because they require all of the data.

I'd recommend taking a look at the video below, which will explain this
concept in much greater detail. It also goes through an example and shows
you how to use the logging tools to understand what is happening within
your program.

https://www.youtube.com/watch?v=dmL0N3qfSc8

Thanks,
Kevin

On Thu, Jan 28, 2016 at 4:41 AM, Philip Lee  wrote:

> Hi,
>
> Simple Question about Spark Distribution of Small Dataset.
>
> Let's say I have 8 machine with 48 cores and 48GB of RAM as a cluster.
> Dataset  (format is ORC by Hive) is so small like 1GB, but I copied it to
> HDFS.
>
> 1) if spark-sql run the dataset distributed on HDFS in each machine, what
> happens to the job? I meant one machine handles the dataset because it is
> so small?
>
> 2) but the thing is dataset is already distributed in each machine.
> or each machine handles the distributed dataset and send it to the Master
> Node?
>
> Could you explain about this in detail in a distributed way?
>
> Best,
> Phil
>
>
>
>


Re: Spark SQL . How to enlarge output rows ?

2016-01-27 Thread Kevin Mellott
I believe that *show* should work if you provide it with both the number of
rows and the truncate flag.

ex: df.show(10, false)

http://spark.apache.org/docs/1.5.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show


On Wed, Jan 27, 2016 at 2:39 AM, Akhil Das 
wrote:

> Why would you want to print all rows? You can try the following:
>
> sqlContext.sql("select day_time from my_table limit
> 10").collect().foreach(println)
>
>
>
> Thanks
> Best Regards
>
> On Sun, Jan 24, 2016 at 5:58 PM, Eli Super  wrote:
>
>> Unfortunately still getting error when use .show() with `false` or
>> `False` or `FALSE`
>>
>> Py4JError: An error occurred while calling o153.showString. Trace:
>> py4j.Py4JException: Method showString([class java.lang.String, class 
>> java.lang.Boolean]) does not exist
>>  at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
>>  at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
>>  at py4j.Gateway.invoke(Gateway.java:252)
>>  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>  at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>  at py4j.GatewayConnection.run(GatewayConnection.java:207)
>>  at java.lang.Thread.run(Thread.java:745)
>>
>>
>> On Thu, Jan 21, 2016 at 4:54 PM, Spencer, Alex (Santander) <
>> alex.spen...@santander.co.uk> wrote:
>>
>>> I forgot to add this is (I think) from 1.5.0.
>>>
>>>
>>>
>>> And yeah that looks like a Python – I’m not hot with Python but it may
>>> be capitalised as False or FALSE?
>>>
>>>
>>>
>>>
>>>
>>> *From:* Eli Super [mailto:eli.su...@gmail.com]
>>> *Sent:* 21 January 2016 14:48
>>> *To:* Spencer, Alex (Santander)
>>> *Cc:* user@spark.apache.org
>>> *Subject:* Re: Spark SQL . How to enlarge output rows ?
>>>
>>>
>>>
>>> Thanks Alex
>>>
>>>
>>>
>>> I get NameError
>>>
>>> NameError: name 'false' is not defined
>>>
>>> Is it because of PySpark ?
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Jan 14, 2016 at 3:34 PM, Spencer, Alex (Santander) <
>>> alex.spen...@santander.co.uk> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> Try …..show(*false*)
>>>
>>>
>>>
>>> public void show(int numRows,
>>>
>>> boolean truncate)
>>>
>>>
>>>
>>>
>>>
>>> Kind Regards,
>>>
>>> Alex.
>>>
>>>
>>>
>>> *From:* Eli Super [mailto:eli.su...@gmail.com]
>>> *Sent:* 14 January 2016 13:09
>>> *To:* user@spark.apache.org
>>> *Subject:* Spark SQL . How to enlarge output rows ?
>>>
>>>
>>>
>>>
>>>
>>> Hi
>>>
>>>
>>>
>>> After executing sql
>>>
>>>
>>>
>>> sqlContext.sql("select day_time from my_table limit 10").show()
>>>
>>>
>>>
>>> my output looks like  :
>>>
>>> ++
>>>
>>> |  day_time|
>>>
>>> ++
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:53:...|
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:51:...|
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:52:...|
>>>
>>> |2015/12/15 15:53:...|
>>>
>>> ++
>>>
>>>
>>>
>>> I'd like to get full rows
>>>
>>> Thanks !
>>>
>>> Emails aren't always secure, and they may be intercepted or changed after
>>> they've been sent. Santander doesn't accept liability if this happens.
>>> If you
>>> think someone may have interfered with this email, please get in touch
>>> with the
>>> sender another way. This message doesn't create or change any contract.
>>> Santander doesn't accept responsibility for damage caused by any viruses
>>> contained in this email or its attachments. Emails may be monitored. If
>>> you've
>>> received this email by mistake, please let the sender know at once that
>>> it's
>>> gone to the wrong person and then destroy it without copying, using, or
>>> telling
>>> anyone about its contents.
>>> Santander UK plc Reg. No. 2294747 and Abbey National Treasury Services
>>> plc Reg.
>>> No. 2338548 Registered Offices: 2 Triton Square, Regent's Place, London
>>> NW1 3AN.
>>> Registered in England. www.santander.co.uk. Authorised by the Prudential
>>> Regulation Authority and regulated by the Financial Conduct Authority
>>> and the
>>> Prudential Regulation Authority. FCA Reg. No. 106054 and 146003
>>> respectively.
>>> Santander Sharedealing is a trading name of Abbey Stockbrokers Limited
>>> Reg. No.
>>> 02666793. Registered Office: Kingfisher House, Radford Way, Billericay,
>>> Essex
>>> CM12 0GZ. Authorised and regulated by the Financial Conduct Authority.
>>> FCA Reg.
>>> No. 154210. You can check this on the Financial Services Register by
>>> visiting
>>> the FCA’s website www.fca.org.uk/register or by contacting the FCA on
>>> 0800 111
>>> 6768. Santander UK plc is also licensed by the Financial Supervision
>>> Commission
>>> of the Isle of Man for its branch in the Isle of Man. Deposits held with
>>> the
>>> Isle of Man branch are covered by the Isle of Man Depositors’
>>> Compensation
>>> Scheme as set out in the Isle of 

Re: [Spark] Reading avro file in Spark 1.3.0

2016-01-25 Thread Kevin Mellott
I think that you may be looking at documentation pertaining to the more
recent versions of Spark. Try looking at the examples linked below, which
applies to the Spark 1.3 version. There aren't many Java examples, but the
code should be very similar to the Scala ones (i.e. using "load" instead of
"read' on the DataFrame).

https://github.com/databricks/spark-avro/tree/branch-1.0

On Mon, Jan 25, 2016 at 4:38 AM, diplomatic Guru 
wrote:

> Hello guys,
>
> I've been trying to read avro file using Spark's DataFrame but it's
> throwing this error:
> java.lang.NoSuchMethodError:
> org.apache.spark.sql.SQLContext.read()Lorg/apache/spark/sql/DataFrameReader;
>
> This is what I've done so far:
>
> I've added the dependency to pom.xml:
>
> 
> com.databricks
> spark-avro_2.10
> 1.0.0
> 
>
> Java code:
>
> JavaSparkContext sc = new JavaSparkContext(sparkConf);
> SQLContext sqlContext = new SQLContext(sc);
> DataFrame df =
> sqlContext.read().format("com.databricks.spark.avro").load(args[0]);
>
> Could you please let me know what am I doing wrong?
>
> Thanks.
>
>
>
>
>


Re: Passing binding variable in query used in Data Source API

2016-01-21 Thread Kevin Mellott
Another alternative that you can consider is to use Sqoop
 to move your data from PostgreSQL to HDFS, and
then just load it into your DataFrame without needing to use JDBC drivers.
I've had success using this approach, and depending on your setup you can
easily manage/schedule this type of workflow using a tool like Oozie
.

On Thu, Jan 21, 2016 at 8:34 AM, Todd Nist  wrote:

> Hi Satish,
>
> You should be able to do something like this:
>
>val props = new java.util.Properties()
>props.put("user", username)
>props.put("password",pwd)
>props.put("driver", "org.postgresql.Drive")
>val deptNo = 10
>val where = Some(s"dept_number = $deptNo")
>val df = sqlContext.read.jdbc("jdbc:postgresql://
> 10.00.00.000:5432/db_test?user=username=password
> ", "
> schema.table1", Array(where.getOrElse("")), props)
>
> or just add the fillter to your query like this and I believe these should
> get pushed down.
>
>   val df = sqlContext.read
> .format("jdbc")
> .option("url", "jdbc:postgresql://
> 10.00.00.000:5432/db_test?user=username=password
> ")
> .option("user", username)
> .option("password", pwd)
> .option("driver", "org.postgresql.Driver")
> .option("dbtable", "schema.table1")
> .load().filter('dept_number === $deptNo)
>
> This is form the top of my head and the code has not been tested or
> compiled.
>
> HTH.
>
> -Todd
>
>
> On Thu, Jan 21, 2016 at 6:02 AM, satish chandra j <
> jsatishchan...@gmail.com> wrote:
>
>> Hi All,
>>
>> We have requirement to fetch data from source PostgreSQL database as per
>> a condition, hence need to pass a binding variable in query used in Data
>> Source API as below:
>>
>>
>> var DeptNbr = 10
>>
>> val dataSource_dF=cc.load("jdbc",Map("url"->"jdbc:postgresql://
>> 10.00.00.000:5432/db_test?user=username=password","driver"->"org.postgresql.Driver","dbtable"->"(select*
>> from schema.table1 where dept_number=DeptNbr) as table1"))
>>
>>
>> But it errors saying expected ';' but found '='
>>
>>
>> Note: As it is an iterative approach hence cannot use constants but need
>> to pass variable to query
>>
>>
>> If anybody had a similar implementation to pass binding variable while
>> fetching data from source database using Data Source than please provide
>> details on the same
>>
>>
>> Regards,
>>
>> Satish Chandra
>>
>
>


Re: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem

2016-01-20 Thread Kevin Mellott
Hi Andy,

According to the API documentation for DataFrame
,
you should have access to *sqlContext* as a property off of the DataFrame
instance. In your example, you could then do something like:

df.sqlContext.udf.register(...)

Thanks,
Kevin

On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> For clarity callUDF() is not defined on DataFrames. It is defined on 
> org.apache.spark.sql.functions
> . Strange the class name starts with lower case. I have not figure out
> how to use function class.
>
>
> http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html
>
> Andy
>
> From: Andrew Davidson 
> Date: Wednesday, January 20, 2016 at 4:05 PM
> To: "user @spark" 
> Subject: trouble implementing complex transformer in java that can be
> used with Pipeline. Scala to Java porting problem
>
> I am using 1.6.0. I am having trouble implementing a custom transformer
> derived from org.apache.spark.ml.Transformer in Java that I can use in
> a PipeLine.
>
> So far the only way I figure out how to implement any kind of complex
> functionality and have it applied to a DataFrame is to implement a UDF. For
> example
>
>
>class StemmerUDF implements UDF1, Serializable {
>
> private static final long serialVersionUID = 1L;
>
>
> @Override
>
> public List call(String text) throws Exception {
>
> List ret = stemText(text); //call org.apache.lucene
>
> return ret;
>
> }
>
> }
>
>
> Before I can use the UDF it needs to be registered. This requires the
> sqlContext. *The problem is sqlContext is not available during
> pipeline.load()*
>
>void registerUDF(SQLContext sqlContext) {
>
> if (udf == null) {
>
> udf = new StemmerUDF();
>
> DataType returnType = DataTypes.createArrayType(DataTypes.
> StringType);
>
> sqlContext.udf().register(udfName, udf, returnType);
>
> }
>
> }
>
>
> Our transformer needs to implement transform(). For it to be able to use
> the registered UDF we need the sqlContext. *The problem is the sqlContext
> is not part of the signature of transform.* My current hack is to pass
> the sqlContext to the constructor and not to use pipelines
>
>   @Override
>
> public DataFrame transform(DataFrame df) {
>
>   String fmt = "%s(%s) as %s";
>
> String stmt = String.format(fmt, udfName, inputCol, outputCol);
>
> logger.info("\nstmt: {}", stmt);
>
> DataFrame ret = df.selectExpr("*", stmt);
>
> return ret;
>
> }
>
>
> Is they a way to do something like df.callUDF(myUDF);
>
>
> *The following Scala code looks like it is close to what I need. I not
> been able to figure out how do something like this in Java 8. callUDF does
> not seem to be avaliable.*
>
>
>
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
>
> @DeveloperApi
>
> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT,
> T]]
>
>   extends Transformer with HasInputCol with HasOutputCol with Logging {
>
> . . .
>
>
>  override def transform(dataset: DataFrame): DataFrame = {
>
> transformSchema(dataset.schema, logging = true)
>
> dataset.withColumn($(outputCol),
>
>   callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol
> 
>
>   }
>
>
>
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
>
>
> class Tokenizer(override val uid: String)
>
>   extends UnaryTransformer[String, Seq[String], Tokenizer] with
> DefaultParamsWritable {
>
>
> . . .
>
>   override protected def createTransformFunc: String => Seq[String] = {
>
> _.toLowerCase.split("\\s")
>
>   }
>
> . . .
>
> }
>
>
> Kind regards
>
>
> Andy
>
>
>


Re: sqlContext.cacheTable("tableName") vs dataFrame.cache()

2016-01-15 Thread Kevin Mellott
Hi George,

I believe that sqlContext.cacheTable("tableName") is to be used when you
want to cache the data that is being used within a Spark SQL query. For
example, take a look at the code below.


> val myData = sqlContext.load("com.databricks.spark.csv", Map("path" ->
> "hdfs://somepath/file", "header" -> "false").toDF("col1", "col2")
>
myData.registerTempTable("myData")


Here, the usage of *cache()* will affect ONLY the *myData.select* query.

> myData.cache()

myData.select("col1", "col2").show()


Here, the usage of *cacheTable* will affect ONLY the *sqlContext.sql* query.

> sqlContext.cacheTable("myData")

sqlContext.sql("SELECT col1, col2 FROM myData").show()


Thanks,
Kevin

On Fri, Jan 15, 2016 at 7:00 AM, George Sigletos 
wrote:

> According to the documentation they are exactly the same, but in my
> queries
>
> dataFrame.cache()
>
> results in much faster execution times vs doing
>
> sqlContext.cacheTable("tableName")
>
> Is there any explanation about this? I am not caching the RDD prior to
> creating the dataframe. Using Pyspark on Spark 1.5.2
>
> Kind regards,
> George
>


Re: Multi tenancy, REST and MLlib

2016-01-15 Thread Kevin Mellott
It sounds like you may be interested in a solution that implements the Lambda
Architecture , such as
Oryx2 . At a high level, this gives you the ability to
request and receive information immediately (serving layer), generating the
responses using a pre-built model (speed layer). Meanwhile, that model is
constantly being updated in the background as new information becomes
available (batch layer).

An example of a pre-built model in this scenario may be a predictive model
that want to predict the class of an incoming piece of data (i.e. does this
email look like SPAM or not).

On Fri, Jan 15, 2016 at 5:00 PM, feribg  wrote:

> I'm fairly new to Spark and Mllib, but i'm doing some research into multi
> tenancy of mllib based app. The idea is to provide ability to train models
> on demand with certain constraints (executor size) and then allow to serve
> predictions from those models via a REST layer.
>
> So far from my research I've gathered the following:
>
> 1) It's fairly easy to schedule training jobs and define the size of the
> executor of the job with something like spark job server or via cmd. I'd
> imagine you need separate contexts here anyways, because if theres one big
> context shared amongst different tenants, it wont allow training different
> models in parallel for the most part. So the solution here seems a context
> per tenant and training via Spark Job Server.
>
> 2) Second part seems a bit more tricky as it must expose the results of the
> trained models to the outside world via some form of API. So far I've been
> able to create a new context inside of a simple Spring REST application,
> load the persisted model and be able to call predict and return results.
>
> My main problem with this approach is that now I need to load the whole
> spark context for each single model instance and a single tenant can
> potentially have a bunch, which also means at least a JVM per tenant and
> this is quite wasteful. It seems the actual prediction part is fairly
> simple
> and I was wondering if there was a way to share multiple models to predict
> from on the same context. Would that allow parallel predictions (ie model B
> doesnt have to wait for a prediction of model A to complete in order to
> return).
>
> Given this simple scenario do you see a better approach to architect that,
> maybe I'm missing certain features of Spark that would facilitate it in a
> cleaner and more efficient manner.
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Multi-tenancy-REST-and-MLlib-tp25979.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: yarn-client: SparkSubmitDriverBootstrapper not found in yarn client mode (1.6.0)

2016-01-13 Thread Kevin Mellott
Lin - if you add "--verbose" to your original *spark-submit* command, it
will let you know the location in which Spark is running. As Marcelo
pointed out, this will likely indicate version 1.3, which may help you
confirm if this is your problem.

On Wed, Jan 13, 2016 at 12:06 PM, Marcelo Vanzin 
wrote:

> SparkSubmitDriverBootstrapper was removed back in Spark 1.4, so it
> seems you have a mixbag of 1.3 / 1.6 in your path / classpath and
> things are failing because of that.
>
> On Wed, Jan 13, 2016 at 9:31 AM, Lin Zhao  wrote:
> > My job runs fine in yarn cluster mode but I have reason to use client
> mode
> > instead. But I'm hitting this error when submitting:
> >> spark-submit --class com.exabeam.martini.scripts.SparkStreamingTest
> >> --master yarn --deploy-mode client --executor-memory 90G
> --num-executors 3
> >> --executor-cores 14 Martini-assembly-0.1.jar yarn-client
> >
> > Error: Could not find or load main class
> > org.apache.spark.deploy.SparkSubmitDriverBootstrapper
> >
> >
> >  If I replace deploy-mode to cluster the job is submitted successfully.
> Is
> > there a dependency missing from my project? Right now only one I
> included is
> > spark-streaming 1.6.0.
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Hive is unable to avro file written by spark avro

2016-01-13 Thread Kevin Mellott
Hi Sivakumar,

I have run into this issue in the past, and we were able to fix it by using
an explicit schema when saving the DataFrame to the Avro file. This schema
was an exact match to the one associated with the metadata on the Hive
database table, which allowed the Hive queries to work even after updating
the underlying Avro file via Spark.

We are using Spark 1.3.0, and I was hoping to find a better solution to
this problem once we upgrade to Spark 1.5.0 (we manage versions via CDH).
This one works, but the coding involved can be a little tedious based on
the complexity of your data.

If memory serves correctly, the explicit schema was necessary because our
data structure contained optional nested properties. The DataFrame writer
will automatically create a schema for you, but ours was differing based on
the data being saved (i.e. whether it did or did not contain a nested
element).

- Kevin

On Wed, Jan 13, 2016 at 7:20 PM, Siva  wrote:

> Hi Everyone,
>
> Avro data written by dataframe in hdfs in not able to read by hive. Saving
> data avro format with below statement.
>
> df.save("com.databricks.spark.avro", SaveMode.Append, Map("path" -> path))
>
> Created hive avro external table and while reading I see all nulls. Did
> anyone face similar issue, what is the best way to write the data in avro
> format from spark, so that it can also readable by hive.
>
> Thanks,
> Sivakumar Bhavanari.
>


Re: rdd join very slow when rdd created from data frame

2016-01-12 Thread Kevin Mellott
Can you please provide the high-level schema of the entities that you are
attempting to join? I think that you may be able to use a more efficient
technique to join these together; perhaps by registering the Dataframes as
temp tables and constructing a Spark SQL query.

Also, which version of Spark are you using?

On Tue, Jan 12, 2016 at 4:16 PM, Koert Kuipers  wrote:

> we are having a join of 2 rdds thats fast (< 1 min), and suddenly it
> wouldn't even finish overnight anymore. the change was that the rdd was now
> derived from a dataframe.
>
> so the new code that runs forever is something like this:
> dataframe.rdd.map(row => (Row(row(0)), row)).join(...)
>
> any idea why?
> i imagined it had something to do with recomputing parts of the data
> frame, but even a small change like this makes the issue go away:
> dataframe.rdd.map(row => Row.fromSeq(row.toSeq)).map(row => (Row(row(0)),
> row)).join(...)
>