Re: Convert a line of String into column

2019-10-05 Thread Dhaval Modi
Hi,

1st convert  "lines"  to dataframe. You will get one column with original
string in one row.

Post this, use string split on this column to convert to Array of String.

After This, you can use explode function to have each element of the array
as columns.

On Wed 2 Oct, 2019, 03:18 ,  wrote:

> I want to convert a line of String to a table. For instance, I want to
> convert following line
>
>... # this is a line in a text
> file, separated by a white space
>
> to table
>
> +-+--++--+
> |col1| col2| col3...|col6|
> +-+-+-+-+
> |val1|val2|val3|val6|
> +-+--+---.+-+
> .
>
> The code looks as below
>
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.SparkSession
>
> val spark = SparkSession
>   .builder
>   .master("local")
>   .appName("MyApp")
>   .getOrCreate()
>
> import spark.implicits._
>
> val lines = spark.readStream.textFile("/tmp/data/")
>
> val words = lines.as[String].flatMap(_.split(" "))
> words.printSchema()
>
> val query = words.
>   writeStream.
>   outputMode("append").
>   format("console").
>   start
> query.awaitTermination()
>
> But in fact this code only turns the line into a single column
>
> +---+
> | value|
> +---+
> |col1...|
> |col2...|
> | col3..|
> |  ... |
> |  col6 |
> +--+
>
> How to achieve the effect that I want to do?
>
> Thanks?
>
>


Re: Stopping a Spark Streaming Context gracefully

2018-07-15 Thread Dhaval Modi
+1

Regards,
Dhaval Modi
dhavalmod...@gmail.com

On 8 November 2017 at 00:06, Bryan Jeffrey  wrote:

> Hello.
>
> I am running Spark 2.1, Scala 2.11.  We're running several Spark streaming
> jobs.  In some cases we restart these jobs on an occasional basis.  We have
> code that looks like the following:
>
> logger.info("Starting the streaming context!")
> ssc.start()
> logger.info("Waiting for termination!")
> Option(config.getInt(Parameters.RestartMinutes)).getOrElse(0) match {
>   case restartMinutes: Int if restartMinutes > 0 =>
> logger.info(s"Waiting for ${restartMinutes} before terminating job")
> ssc.awaitTerminationOrTimeout(restartMinutes * 
> DateUtils.millisecondsPerMinute)
>   case _ => ssc.awaitTermination()
> }
> logger.info("Calling 'stop'")
> ssc.stop(stopSparkContext = true, stopGracefully = true)
>
>
> In several cases we've observed jobs where we've called 'stop' not
> stopping.  I went and wrote a simple job that reads from Kafka and does
> nothing (prints a count of data).  After several minutes it simply calls
> 'ssc.stop(true, true)'.  In some cases this will stop the context.  In
> others it will not stop the context.  If we call 'stop' several times over
> an interval one of them eventually succeeds.
>
> It looks like this is a bug.  I looked in Jira and did not see an open
> issue.  Is this a  known problem?  If not I'll open a bug.
>
> Regards,
>
> Bryan Jeffrey
>
>
>


Re: Properly stop applications or jobs within the application

2018-07-15 Thread Dhaval Modi
@sagar - YARN kill is not a reliable process for spark streaming.



Regards,
Dhaval Modi
dhavalmod...@gmail.com

On 8 March 2018 at 17:18, bsikander  wrote:

> I am running in Spark standalone mode. No YARN.
>
> anyways, yarn application -kill is a manual process. I donot want that. I
> was to properly kill the driver/application programatically.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Stopping StreamingContext

2018-07-15 Thread Dhaval Modi
+1

Regards,
Dhaval Modi
dhavalmod...@gmail.com

On 29 March 2018 at 19:57, Sidney Feiner  wrote:

> Hey,
>
> I have a Spark Streaming application processing some events.
>
> Sometimes, I want to stop the application if a get a specific event. I
> collect the executor's results in the driver and based on those results, I
> kill the StreamingContext using StreamingContext.stop(
> stopSparkContext=true).
>
> When I do that, I can see in the logs that the app is shutting down,
> closing receivers etc.
>
> But when I go to the master's web UI I can still see the app under
> "Running Applications". But if I click it, it says the endpoint doesn't
> exist.
>
> When I check the open processes on the machine, I can see that the job's
> process is still running.
>
> Am I closing the application wrong?
>
>
>
> Those are the logs once I call the stop() method:
>
>
>
> 2018-03-28 11:59:04 INFO  KafkaProducer:615 - Closing the Kafka producer
> with timeoutMillis = 9223372036854775807 ms.
>
> 2018-03-28 11:59:04 INFO  ReceiverTracker:54 - Sent stop signal to all 1
> receivers
>
> 2018-03-28 11:59:05 INFO  BlockManagerInfo:54 - Added
> input-0-1522238344750 in memory on i-va-spark1:59059 (size: 1632.0 B, free:
> 579.2 MB)
>
> 2018-03-28 11:59:05 ERROR ReceiverTracker:70 - Deregistered receiver for
> stream 0: Stopped by driver
>
> 2018-03-28 11:59:05 INFO  BlockManagerInfo:54 - Added
> input-0-1522238345000 in memory on i-va-spark1:59059 (size: 272.0 B, free:
> 579.2 MB)
>
> 2018-03-28 11:59:05 INFO  TaskSetManager:54 - Finished task 0.0 in stage
> 2.0 (TID 70) in 30213 ms on i-va-spark1 (executor 0) (1/1)
>
> 2018-03-28 11:59:05 INFO  TaskSchedulerImpl:54 - Removed TaskSet 2.0,
> whose tasks have all completed, from pool
>
> 2018-03-28 11:59:05 INFO  DAGScheduler:54 - ResultStage 2 (start at
> UserLocationHistoryJob.scala:38) finished in 30.213 s
>
> 2018-03-28 11:59:05 INFO  ReceiverTracker:54 - All of the receivers have
> deregistered successfully
>
> 2018-03-28 11:59:05 INFO  ReceiverTracker:54 - ReceiverTracker stopped
>
> 2018-03-28 11:59:05 INFO  JobGenerator:54 - Stopping JobGenerator
> immediately
>
> 2018-03-28 11:59:05 INFO  RecurringTimer:54 - Stopped timer for
> JobGenerator after time 152223834
>
> 2018-03-28 11:59:05 INFO  JobGenerator:54 - Stopped JobGenerator
>
> 2018-03-28 11:59:07 INFO  JobScheduler:54 - Stopped JobScheduler
>
> 2018-03-28 11:59:07 INFO  StreamingContext:54 - StreamingContext stopped
> successfully
>
> 2018-03-28 11:59:07 INFO  BlockManagerInfo:54 - Removed broadcast_5_piece0
> on 10.0.0.243:41976 in memory (size: 2.4 KB, free: 488.4 MB)
>
> 2018-03-28 11:59:07 INFO  BlockManagerInfo:54 - Removed broadcast_5_piece0
> on i-va-spark1:59059 in memory (size: 2.4 KB, free: 579.2 MB)
>
> 2018-03-28 11:59:07 INFO  BlockManagerInfo:54 - Removed broadcast_4_piece0
> on 10.0.0.243:41976 in memory (size: 23.9 KB, free: 488.4 MB)
>
> 2018-03-28 11:59:07 INFO  BlockManagerInfo:54 - Removed broadcast_4_piece0
> on i-va-spark1:59059 in memory (size: 23.9 KB, free: 579.2 MB)
>
> 2018-03-28 11:59:37 WARN  QueuedThreadPool:178 -
> SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop
> Thread[SparkUI-171-selector-ServerConnectorManager@478b3e9/2,5,main]
>
> 2018-03-28 11:59:37 WARN  QueuedThreadPool:178 -
> SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop
> Thread[SparkUI-172-selector-ServerConnectorManager@478b3e9/3,5,main]
>
> 2018-03-28 11:59:37 WARN  QueuedThreadPool:178 -
> SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop
> Thread[SparkUI-169-selector-ServerConnectorManager@478b3e9/0,5,main]
>
> 2018-03-28 11:59:37 WARN  QueuedThreadPool:178 -
> SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop
> Thread[SparkUI-170-selector-ServerConnectorManager@478b3e9/1,5,main]
>
> 2018-03-28 13:22:01 INFO  DiskBlockManager:54 - Shutdown hook called
>
> 2018-03-28 13:22:01 INFO  ShutdownHookManager:54 - Shutdown hook called
>
> 2018-03-28 13:22:01 INFO  ShutdownHookManager:54 - Deleting directory
> /data/spark/scratch/spark-69a3a8a6-5504-4153-a4c1-059676861581
>
> 2018-03-28 13:22:01 INFO  ShutdownHookManager:54 - Deleting directory
> /data/spark/scratch/spark-69a3a8a6-5504-4153-a4c1-059676861581/userFiles-
> 8a970eec-da41-442b-9ccf-1621b9e9e045
>
>
>
>
>
>
>
> *Sidney Feiner* */* SW Developer
>
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
>
>
> [image: emailsignature]
>
>
>
>
>


How to stop streaming jobs

2018-07-15 Thread Dhaval Modi
Hi Team,

I have condition where I want to stop infinitely running spark streaming
jobs.

Currently spark streaming job is configured with
"awaitTerminationOrTimeout(-1)" and is deploy in cluster mode in YARN.

I have read - YARN kill does not work in this case.

Can you please guide what are the other ways to stop these jobs?


Regards,
Dhaval Modi
dhavalmod...@gmail.com


Re: Advice on multiple streaming job

2018-05-07 Thread Dhaval Modi
Hi Gerard,

Our source is kafka, and we are using standard streaming api (DStreams).

Our requirement is,  as we have 100's of kafka topics, Each topic sends
different messages in JSON (complex) format. Topics structured are as per
domain.
Hence, each topic is independent of each other.
These JSON messages needs to be flattened and stored in Hive.

For these 100's of topic, currently we have 100's of jobs running
independently and using different UI port.



Regards,
Dhaval Modi
dhavalmod...@gmail.com

On 7 May 2018 at 13:53, Gerard Maas <gerard.m...@gmail.com> wrote:

> Dhaval,
>
> Which Streaming API are you using?
> In Structured Streaming, you are able to start several streaming queries
> within the same context.
>
> kind regards, Gerard.
>
> On Sun, May 6, 2018 at 7:59 PM, Dhaval Modi <dhavalmod...@gmail.com>
> wrote:
>
>> Hi Susan,
>>
>> Thanks for your response.
>>
>> Will try configuration as suggested.
>>
>> But still i am looking for answer does Spark support running multiple
>> jobs on the same port?
>>
>> On Sun, May 6, 2018, 20:27 Susan X. Huynh <xhu...@mesosphere.io> wrote:
>>
>>> Hi Dhaval,
>>>
>>> Not sure if you have considered this: the port 4040 sounds like a driver
>>> UI port. By default it will try up to 4056, but you can increase that
>>> number with "spark.port.maxRetries". (https://spark.apache.org/docs
>>> /latest/configuration.html) Try setting it to "32". This would help if
>>> the only conflict is among the driver UI ports (like if you have > 16
>>> drivers running on the same host).
>>>
>>> Susan
>>>
>>> On Sun, May 6, 2018 at 12:32 AM, vincent gromakowski <
>>> vincent.gromakow...@gmail.com> wrote:
>>>
>>>> Use a scheduler that abstract the network away with a CNI for instance
>>>> or other mécanismes (mesos, kubernetes, yarn). The CNI will allow to always
>>>> bind on the same ports because each container will have its own IP. Some
>>>> other solution like mesos and marathon can work without CNI , with host IP
>>>> binding, but will manage the ports for you ensuring there isn't any
>>>> conflict.
>>>>
>>>> Le sam. 5 mai 2018 à 17:10, Dhaval Modi <dhavalmod...@gmail.com> a
>>>> écrit :
>>>>
>>>>> Hi All,
>>>>>
>>>>> Need advice on executing multiple streaming jobs.
>>>>>
>>>>> Problem:- We have 100's of streaming job. Every streaming job uses new
>>>>> port. Also, Spark automatically checks port from 4040 to 4056, post that 
>>>>> it
>>>>> fails. One of the workaround, is to provide port explicitly.
>>>>>
>>>>> Is there a way to tackle this situation? or Am I missing any thing?
>>>>>
>>>>> Thanking you in advance.
>>>>>
>>>>> Regards,
>>>>> Dhaval Modi
>>>>> dhavalmod...@gmail.com
>>>>>
>>>>
>>>
>>>
>>> --
>>> Susan X. Huynh
>>> Software engineer, Data Agility
>>> xhu...@mesosphere.com
>>>
>>
>


Re: Advice on multiple streaming job

2018-05-06 Thread Dhaval Modi
Hi Susan,

Thanks for your response.

Will try configuration as suggested.

But still i am looking for answer does Spark support running multiple jobs
on the same port?

On Sun, May 6, 2018, 20:27 Susan X. Huynh <xhu...@mesosphere.io> wrote:

> Hi Dhaval,
>
> Not sure if you have considered this: the port 4040 sounds like a driver
> UI port. By default it will try up to 4056, but you can increase that
> number with "spark.port.maxRetries". (
> https://spark.apache.org/docs/latest/configuration.html) Try setting it
> to "32". This would help if the only conflict is among the driver UI ports
> (like if you have > 16 drivers running on the same host).
>
> Susan
>
> On Sun, May 6, 2018 at 12:32 AM, vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> Use a scheduler that abstract the network away with a CNI for instance or
>> other mécanismes (mesos, kubernetes, yarn). The CNI will allow to always
>> bind on the same ports because each container will have its own IP. Some
>> other solution like mesos and marathon can work without CNI , with host IP
>> binding, but will manage the ports for you ensuring there isn't any
>> conflict.
>>
>> Le sam. 5 mai 2018 à 17:10, Dhaval Modi <dhavalmod...@gmail.com> a
>> écrit :
>>
>>> Hi All,
>>>
>>> Need advice on executing multiple streaming jobs.
>>>
>>> Problem:- We have 100's of streaming job. Every streaming job uses new
>>> port. Also, Spark automatically checks port from 4040 to 4056, post that it
>>> fails. One of the workaround, is to provide port explicitly.
>>>
>>> Is there a way to tackle this situation? or Am I missing any thing?
>>>
>>> Thanking you in advance.
>>>
>>> Regards,
>>> Dhaval Modi
>>> dhavalmod...@gmail.com
>>>
>>
>
>
> --
> Susan X. Huynh
> Software engineer, Data Agility
> xhu...@mesosphere.com
>


Re: Advice on multiple streaming job

2018-05-06 Thread Dhaval Modi
Hi vincent,

Thanks for your response.

We are using YARN, and CNI may not be possible.

Thanks & Regards,
Dhaval


On Sun, May 6, 2018, 13:02 vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Use a scheduler that abstract the network away with a CNI for instance or
> other mécanismes (mesos, kubernetes, yarn). The CNI will allow to always
> bind on the same ports because each container will have its own IP. Some
> other solution like mesos and marathon can work without CNI , with host IP
> binding, but will manage the ports for you ensuring there isn't any
> conflict.
>
> Le sam. 5 mai 2018 à 17:10, Dhaval Modi <dhavalmod...@gmail.com> a écrit :
>
>> Hi All,
>>
>> Need advice on executing multiple streaming jobs.
>>
>> Problem:- We have 100's of streaming job. Every streaming job uses new
>> port. Also, Spark automatically checks port from 4040 to 4056, post that it
>> fails. One of the workaround, is to provide port explicitly.
>>
>> Is there a way to tackle this situation? or Am I missing any thing?
>>
>> Thanking you in advance.
>>
>> Regards,
>> Dhaval Modi
>> dhavalmod...@gmail.com
>>
>


Advice on multiple streaming job

2018-05-05 Thread Dhaval Modi
Hi All,

Need advice on executing multiple streaming jobs.

Problem:- We have 100's of streaming job. Every streaming job uses new
port. Also, Spark automatically checks port from 4040 to 4056, post that it
fails. One of the workaround, is to provide port explicitly.

Is there a way to tackle this situation? or Am I missing any thing?

Thanking you in advance.

Regards,
Dhaval Modi
dhavalmod...@gmail.com


Re: How to set NameSpace while storing from Spark to HBase using saveAsNewAPIHadoopDataSet

2016-12-19 Thread Dhaval Modi
Replace  with ":"

Regards,
Dhaval Modi

On 19 December 2016 at 13:10, Rabin Banerjee <dev.rabin.baner...@gmail.com>
wrote:

> HI All,
>
>   I am trying to save data from Spark into HBase using saveHadoopDataSet
> API . Please refer the below code . Code is working fine .But the table is
> getting stored in the default namespace.how to set the NameSpace in the
> below code?
>
>
>
>
> wordCounts.foreachRDD ( rdd = {
>   val conf = HBaseConfiguration.create()
>   conf.set(TableOutputFormat.OUTPUT_TABLE, "stream_count")
>   conf.set("hbase.zookeeper.quorum", "localhost:2181")
>   conf.set("hbase.master", "localhost:6");
>   conf.set("hbase.rootdir", "file:///tmp/hbase")
>
>   val jobConf = new Configuration(conf)
>   jobConf.set("mapreduce.job.output.key.class", classOf[Text].getName)
>   jobConf.set("mapreduce.job.output.value.class", classOf[LongWritable].
> getName)
>   jobConf.set("mapreduce.outputformat.class", classOf[TableOutputFormat[
> Text]].getName)
>
>   rdd.saveAsNewAPIHadoopDataset(jobConf)
> })
>
> Regards,
> R Banerjee
>


Re: This simple UDF is not working!

2016-03-26 Thread Dhaval Modi
Hi Mich,

You can try this:

val toDate = udf{(out:String, form: String) => {
val format = new SimpleDateFormat(s"$form");
Try(new Date(format.parse(out.toString()).getTime))match {
case Success(t) => Some(t)
case Failure(_) => None
}}};

Usage: src = src.withColumn(s"$columnName", toDate(src(s"$columnName"),
lit(s"dd/MM/")))


Regards,
Dhaval Modi
dhavalmod...@gmail.com

On 26 March 2016 at 04:34, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi Ted,
>
> I decided to take a short cut here. I created the map leaving date as it
> is (p(1)) below
>
> def CleanupCurrency (word : String) : Double = {
> return word.toString.substring(1).replace(",", "").toDouble
> }
> sqlContext.udf.register("CleanupCurrency", CleanupCurrency(_:String))
> val a = df.filter(col("Total") > "").map(p => Invoices(p(0).toString,
> p(1).toString, CleanupCurrency(p(2).toString),
> CleanupCurrency(p(3).toString), CleanupCurrency(p(4).toString)))
>
> //
> // convert this RDD to DF and create a Spark temporary table
> //
> a.toDF.registerTempTable("tmp")
>
> INSERT INTO TABLE 
> SELECT
>   INVOICENUMBER
> ,
> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(paymentdate,'dd/MM/'),'-MM-dd'))
> AS paymentdate
> , NET
> , VAT
> , TOTAL
> FROM tmp
> """
> sql(sqltext)
>
> That works OK.
>
> If I want to find invoices with paymentdate > 6 months old I do
>
> sql("SELECT invoicenumber, paymentdate FROM test.t14 *WHERE
> months_between(FROM_unixtime(unix_timestamp(), '-MM-dd'), paymentdate)*
> > 6 ORDER BY invoicenumber, paymentdate").collect.foreach(println)
> [360,2014-02-10]
> [361,2014-02-17]
>
> I still interested if I could do it using a UDF :)
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 25 March 2016 at 17:44, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Mich:
>> Please take a look at:
>> sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
>>
>> test("function to_date") {
>>
>> Remember to:
>> import org.apache.spark.sql.functions._
>>
>> On Fri, Mar 25, 2016 at 7:59 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> This works with sql
>>>
>>> sqltext = """
>>> INSERT INTO TABLE t14
>>> SELECT
>>>   INVOICENUMBER
>>> ,
>>> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(paymentdate,'dd/MM/'),'-MM-dd'))
>>> AS paymentdate
>>> , NET
>>> , VAT
>>> , TOTAL
>>> FROM tmp
>>> """
>>> sql(sqltext)
>>>
>>>
>>> but not in UDF.  I want to convert it to correct date format  before
>>> writing it to table
>>>
>>> Thanks
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 25 March 2016 at 14:54, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Do you mind showing body of TO_DATE() ?
>>>>
>>>> Thanks
>>>>
>>>> On Fri, Mar 25, 2016 at 7:38 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Looks like you forgot an import for Date.
>>>>>
>>>>> FYI
>>>>>
>>>>> On Fri, Mar 25, 2016 at 7:36 AM, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> writing a UDF to convert  a string into Date
>>>>>>
>>>>>> def ChangeDate(word : String) : Date = {
>>>>>>  | return
>>>>>> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(word),"dd/MM/"),"-MM-dd")
>>>>>>  | }
>>>>>> :19: error: not found: type Date
>>>>>>
>>>>>> That code to_date.. works OK in sql but not here. It is complaining
>>>>>> about to_date?
>>>>>>
>>>>>> Any ideas will be appreciated.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * 
>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: What is the most efficient and scalable way to get all the recommendation results from ALS model ?

2016-03-20 Thread Dhaval Modi
+1
On Mar 21, 2016 09:52, "Hiroyuki Yamada"  wrote:

> Could anyone give me some advices or recommendations or usual ways to do
> this ?
>
> I am trying to get all (probably top 100) product recommendations for each
> user from a model (MatrixFactorizationModel),
> but I haven't figured out yet to do it efficiently.
>
> So far,
> calling predict (predictAll in pyspark) method with user-product matrix
> uses too much memory and couldn't complete due to a lack of memory,
> and
> calling predict for each user (or for each some users like 100 uses or so)
> takes too much time to get all the recommendations.
>
> I am using spark 1.4.1 and running 5-node cluster with 8GB RAM each.
> I only use small-sized data set so far, like about 5 users and 5000
> products with only about 10 ratings.
>
> Thanks.
>
>
> On Sat, Mar 19, 2016 at 7:58 PM, Hiroyuki Yamada 
> wrote:
>
>> Hi,
>>
>> I'm testing Collaborative Filtering with Milib.
>> Making a model by ALS.trainImplicit (or train) seems scalable as far as I
>> have tested,
>> but I'm wondering how I can get all the recommendation results
>> efficiently.
>>
>> The predictAll method can get all the results,
>> but it needs the whole user-product matrix in memory as an input.
>> So if there are 1 million users and 1 million products, then the number
>> of elements is too large (1 million x 1 million)
>> and the amount of memory to hold them is more than a few TB even when the
>> element size in only 4B,
>> which is not a realistic size of memory even now.
>>
>> # (100*100)*4/1000/1000/1000/1000 => near equals 4TB)
>>
>> We can, of course, use predict method per user,
>> but, as far as I tried, it is very slow to get 1 million users' results.
>>
>> Do I miss something ?
>> Are there any other better ways to get all the recommendation results in
>> scalable and efficient way ?
>>
>> Best regards,
>> Hiro
>>
>>
>>
>


Re: Spark SQL drops the HIVE table in "overwrite" mode while writing into table

2016-03-06 Thread Dhaval Modi
Hi Gourav,

I am trying to overwrite existing managed/internal table.

I havent register dataframe,  so it's not a temporary table.  BTW,  I have
added code in JIRA as comment.

Thanks,
Dhaval
On Mar 6, 2016 17:07, "Gourav Sengupta" <gourav.sengu...@gmail.com> wrote:

> hi,
>
> is the table that you are trying to overwrite an external table or
> temporary table created in hivecontext?
>
>
> Regards,
> Gourav Sengupta
>
> On Sat, Mar 5, 2016 at 3:01 PM, Dhaval Modi <dhavalmod...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> I am facing a issue while writing dataframe back to HIVE table.
>>
>> When using "SaveMode.Overwrite" option the table is getting dropped and
>> Spark is unable to recreate it thus throwing error.
>>
>> JIRA: https://issues.apache.org/jira/browse/SPARK-13699
>>
>>
>> E.g.
>> tgtFinal.write.mode(SaveMode.Overwrite).saveAsTable("tgt_table")
>>
>> Error:
>> ++
>> 16/03/05 13:57:26 INFO spark.SparkContext: Created broadcast 138 from run
>> at ThreadPoolExecutor.java:1145
>> 16/03/05 13:57:26 INFO log.PerfLogger: > from=org.apache.hadoop.hive.ql.io.orc.ReaderImpl>
>> *java.lang.RuntimeException: serious problem*
>> *at *
>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
>> at
>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
>> at
>> org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>> at scala.Option.getOrElse(Option.scala:120)
>> 
>> Caused by: java.util.concurrent.ExecutionException:
>> java.io.FileNotFoundException: File does not exist: hdfs://
>> sandbox.hortonworks.com:8020/apps/hive/warehouse/tgt_table
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:188)
>> at
>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:998)
>> ... 138 more
>> *Caused by: java.io.FileNotFoundException: File does not exist:
>> hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/tgt_table
>> <http://sandbox.hortonworks.com:8020/apps/hive/warehouse/tgt_table>*
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
>> ++
>>
>>
>> Regards,
>> Dhaval Modi
>> dhavalmod...@gmail.com
>>
>
>


Spark SQL drops the HIVE table in "overwrite" mode while writing into table

2016-03-05 Thread Dhaval Modi
Hi Team,

I am facing a issue while writing dataframe back to HIVE table.

When using "SaveMode.Overwrite" option the table is getting dropped and
Spark is unable to recreate it thus throwing error.

JIRA: https://issues.apache.org/jira/browse/SPARK-13699


E.g.
tgtFinal.write.mode(SaveMode.Overwrite).saveAsTable("tgt_table")

Error:
++
16/03/05 13:57:26 INFO spark.SparkContext: Created broadcast 138 from run
at ThreadPoolExecutor.java:1145
16/03/05 13:57:26 INFO log.PerfLogger: 
*java.lang.RuntimeException: serious problem*
*at *
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)

Caused by: java.util.concurrent.ExecutionException:
java.io.FileNotFoundException: File does not exist: hdfs://
sandbox.hortonworks.com:8020/apps/hive/warehouse/tgt_table
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:998)
... 138 more
*Caused by: java.io.FileNotFoundException: File does not exist:
hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/tgt_table
<http://sandbox.hortonworks.com:8020/apps/hive/warehouse/tgt_table>*
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
++++++


Regards,
Dhaval Modi
dhavalmod...@gmail.com