Re: [Spark Context]: How to add on demand jobs to an existing spark context?

2017-02-07 Thread Gourav Sengupta
Hi,

Michael's answer will solve the problem in case you using only SQL based
solution.

Otherwise please refer to the wonderful details mentioned here
https://spark.apache.org/docs/latest/job-scheduling.html. With EMR 5.3.0
released  SPARK 2.1.0 is available in AWS.

(note that there is an issue with using zeppelin in it and I have raised it
as an issue to AWS and they are looking into it now)

Regards,
Gourav Sengupta

On Tue, Feb 7, 2017 at 10:37 PM, Michael Segel 
wrote:

> Why couldn’t you use the spark thrift server?
>
>
> On Feb 7, 2017, at 1:28 PM, Cosmin Posteuca 
> wrote:
>
> answer for Gourav Sengupta
>
> I want to use same spark application because i want to work as a FIFO
> scheduler. My problem is that i have many jobs(not so big) and if i run an
> application for every job my cluster will split resources as a FAIR
> scheduler(it's what i observe, maybe i'm wrong) and exist the possibility
> to create bottleneck effect. The start time isn't a problem for me, because
> it isn't a real-time application.
>
> I need a business solution, that's the reason why i can't use code from
> github.
>
> Thanks!
>
> 2017-02-07 19:55 GMT+02:00 Gourav Sengupta :
>
>> Hi,
>>
>> May I ask the reason for using the same spark application? Is it because
>> of the time it takes in order to start a spark context?
>>
>> On another note you may want to look at the number of contributors in a
>> github repo before choosing a solution.
>>
>>
>> Regards,
>> Gourav
>>
>> On Tue, Feb 7, 2017 at 5:26 PM, vincent gromakowski <
>> vincent.gromakow...@gmail.com> wrote:
>>
>>> Spark jobserver or Livy server are the best options for pure technical
>>> API.
>>> If you want to publish business API you will probably have to build you
>>> own app like the one I wrote a year ago https://github.com/elppc/akka-
>>> spark-experiments
>>> It combines Akka actors and a shared Spark context to serve concurrent
>>> subsecond jobs
>>>
>>>
>>> 2017-02-07 15:28 GMT+01:00 ayan guha :
>>>
 I think you are loking for livy or spark  jobserver

 On Wed, 8 Feb 2017 at 12:37 am, Cosmin Posteuca <
 cosmin.poste...@gmail.com> wrote:

> I want to run different jobs on demand with same spark context, but i
> don't know how exactly i can do this.
>
> I try to get current context, but seems it create a new spark
> context(with new executors).
>
> I call spark-submit to add new jobs.
>
> I run code on Amazon EMR(3 instances, 4 core & 16GB ram / instance),
> with yarn as resource manager.
>
> My code:
>
> val sparkContext = SparkContext.getOrCreate()
> val content = 1 to 4
> val result = sparkContext.parallelize(content, 5)
> result.map(value => value.toString).foreach(loop)
>
> def loop(x: String): Unit = {
>for (a <- 1 to 3000) {
>
>}
> }
>
> spark-submit:
>
> spark-submit --executor-cores 1 \
>  --executor-memory 1g \
>  --driver-memory 1g \
>  --master yarn \
>  --deploy-mode cluster \
>  --conf spark.dynamicAllocation.enabled=true \
>  --conf spark.shuffle.service.enabled=true \
>  --conf spark.dynamicAllocation.minExecutors=1 \
>  --conf spark.dynamicAllocation.maxExecutors=3 \
>  --conf spark.dynamicAllocation.initialExecutors=3 \
>  --conf spark.executor.instances=3 \
>
> If i run twice spark-submit it create 6 executors, but i want to run
> all this jobs on same spark application.
>
> How can achieve adding jobs to an existing spark application?
>
> I don't understand why SparkContext.getOrCreate() don't get existing
> spark context.
>
>
> Thanks,
>
> Cosmin P.
>
 --
 Best Regards,
 Ayan Guha

>>>
>>>
>>
>
>


[Spark 2.0.0] java.util.concurrent.TimeoutException while writing to mongodb from Spark

2017-02-07 Thread Palash Gupta
Hi All,
I'm writing  data frame to mongodb using Stratio/Spark-MongoDB  
Initially it was working fine but when the data volume is high then it started 
giving me subjected error and details are as follows. 

Could anybody help me out or suggest what might the solution I should apply or 
how can I increase the timeout value? My cluster setup:
The driver and executor are running in same VM - local[5] 
modespark.driver.memory 50g
Mongodb: 3.2.10Imported Package:  --packages 
com.stratio.datasource:spark-mongodb_2.11:0.12.0

Details Log:
17/02/08 07:03:51 INFO scheduler.DAGScheduler: Job 93 failed: foreachPartition 
at MongodbDataFrame.scala:37, took 39.026989 s
17/02/08 07:03:51 INFO executor.Executor: Finished task 182.0 in stage 253.0 
(TID 25297). 60483 bytes result sent to driver
17/02/08 07:03:51 INFO executor.Executor: Executor killed task 185.0 in stage 
253.0 (TID 25300)
17/02/08 07:03:51 INFO scheduler.TaskSetManager: Finished task 182.0 in stage 
253.0 (TID 25297) in 3797 ms on localhost (183/200)
17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 185.0 in stage 253.0 
(TID 25300, localhost): TaskKilled (killed intentional
ly)
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 3 non-empty 
blocks out of 200 blocks
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote 
fetches in 0 ms
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 8 non-empty 
blocks out of 200 blocks
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote 
fetches in 0 ms
17/02/08 07:03:51 INFO executor.Executor: Executor killed task 186.0 in stage 
253.0 (TID 25301)
17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 186.0 in stage 253.0 
(TID 25301, localhost): TaskKilled (killed intentional
ly)
[INFO] [02/08/2017 07:03:51.283] 
[mongodbClientFactory-akka.actor.default-dispatcher-4] 
[akka://mongodbClientFactory/deadLetters] Mess
age [com.stratio.datasource.mongodb.client.MongodbClientActor$ClientResponse] 
from Actor[akka://mongodbClientFactory/user/mongoConnect
ionActor#1265577515] to Actor[akka://mongodbClientFactory/deadLetters] was not 
delivered. [1] dead letters encountered. This logging c
an be turned off or adjusted with configuration settings 
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty 
blocks out of 200 blocks
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote 
fetches in 0 ms
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 7 non-empty 
blocks out of 200 blocks
17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote 
fetches in 0 ms
17/02/08 07:03:51 INFO executor.Executor: Executor killed task 187.0 in stage 
253.0 (TID 25302)
17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 187.0 in stage 253.0 
(TID 25302, localhost): TaskKilled (killed intentional
ly)
17/02/08 07:03:51 INFO executor.Executor: Executor killed task 183.0 in stage 
253.0 (TID 25298)
17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 183.0 in stage 253.0 
(TID 25298, localhost): TaskKilled (killed intentional
ly)
17/02/08 07:03:51 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 253.0, 
whose tasks have all completed, from pool
Traceback (most recent call last):
  File "/home/hadoop/development/myprogram/datareload_myprogram.py", line 1188, 
in 
    datareporcessing(expected_datetime,expected_directory_hdfs,sqlContext)
  File "/home/hadoop/development/myprogram/datareload_nokialte.py", line 935, 
in datareporcessing
    
df_nokia_myprogram_kpi_ready_raw.write.format("com.stratio.datasource.mongodb").mode('append').options(host='10.15.187.74:27017',
 cred
entials='parsdev,parsdb,', database='DB', collection='MY_G_N_LN_HR', 
connectionsTime='30', updateFields='
S_DATETIME,CM_SBTS,CM_LNBTS,CM_LNCEL').save()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", 
line 530, in save
  File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", 
line 933, in __call__
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, 
in deco
  File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 
312, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o839.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 184 
in stage 253.0 failed 1 times, most recent failure: Lost
 task 184.0 in stage 253.0 (TID 25299, localhost): 
java.util.concurrent.TimeoutException: Futures timed out after [3 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at 

Re: does persistence required for single action ?

2017-02-07 Thread Jörn Franke
Depends on the use case, but a persist before checkpointing can make sense 
after some of the map steps.

> On 8 Feb 2017, at 03:09, Shushant Arora  wrote:
> 
> Hi
> 
> I have a workflow like below:
> 
> rdd1 = sc.textFile(input);
> rdd2 = rdd1.filter(filterfunc1);
> rdd3 = rdd1.filter(fiterfunc2);
> rdd4 = rdd2.map(mapptrans1);
> rdd5 = rdd3.map(maptrans2);
> rdd6 = rdd4.union(rdd5);
> rdd6.foreach(some transformation);
> 
> 
> 
> 
> 
> Do I need to persist rdd1 ?Or its not required since there is only one action 
> at rdd6 which will create only one job and in a single job no need of persist 
> ?
> Also what if transformation on rdd2 is reduceByKey instead of map ? Will this 
> again the same thing no need of persist since single job.
> 
> Thanks


Re: Dynamic resource allocation to Spark on Mesos

2017-02-07 Thread Sun Rui
Yi Jan,

We have been using Spark on Mesos with dynamic allocation enabled, which works 
and improves the overall cluster utilization.

In terms of job, do you mean jobs inside a Spark application or jobs among 
different applications? Maybe you can read 
http://spark.apache.org/docs/latest/job-scheduling.html 
 for help.

> On Jan 31, 2017, at 03:34, Michael Gummelt  wrote:
> 
> 
> 
> On Mon, Jan 30, 2017 at 9:47 AM, Ji Yan  > wrote:
> Tasks begin scheduling as soon as the first executor comes up
> 
> Thanks all for the clarification. Is this the default behavior of Spark on 
> Mesos today? I think this is what we are looking for because sometimes a job 
> can take up lots of resources and later jobs could not get all the resources 
> that it asks for. If a Spark job starts with only a subset of resources that 
> it asks for, does it know to expand its resources later when more resources 
> become available?
> 
> Yes.
>  
> 
> Launch each executor with at least 1GB RAM, but if mesos offers 2GB at some 
> moment, then launch an executor with 2GB RAM
> 
> This is less useful in our use case. But I am also quite interested in cases 
> in which this could be helpful. I think this will also help with overall 
> resource utilization on the cluster if when another job starts up that has a 
> hard requirement on resources, the extra resources to the first job can be 
> flexibly re-allocated to the second job. 
> 
> On Sat, Jan 28, 2017 at 2:32 PM, Michael Gummelt  > wrote:
> We've talked about that, but it hasn't become a priority because we haven't 
> had a driving use case.  If anyone has a good argument for "variable" 
> resource allocation like this, please let me know.
> 
> On Sat, Jan 28, 2017 at 9:17 AM, Shuai Lin  > wrote:
> An alternative behavior is to launch the job with the best resource offer 
> Mesos is able to give
> 
> Michael has just made an excellent explanation about dynamic allocation 
> support in mesos. But IIUC, what you want to achieve is something like (using 
> RAM as an example) : "Launch each executor with at least 1GB RAM, but if 
> mesos offers 2GB at some moment, then launch an executor with 2GB RAM".
> 
> I wonder what's benefit of that? To reduce the "resource fragmentation"?
> 
> Anyway, that is not supported at this moment. In all the supported cluster 
> managers of spark (mesos, yarn, standalone, and the up-to-coming spark on 
> kubernetes), you have to specify the cores and memory of each executor.
> 
> It may not be supported in the future, because only mesos has the concepts of 
> offers because of its two-level scheduling model.
> 
> 
> On Sat, Jan 28, 2017 at 1:35 AM, Ji Yan  > wrote:
> Dear Spark Users,
> 
> Currently is there a way to dynamically allocate resources to Spark on Mesos? 
> Within Spark we can specify the CPU cores, memory before running job. The way 
> I understand is that the Spark job will not run if the CPU/Mem requirement is 
> not met. This may lead to decrease in overall utilization of the cluster. An 
> alternative behavior is to launch the job with the best resource offer Mesos 
> is able to give. Is this possible with the current implementation?
> 
> Thanks
> Ji
> 
> The information in this email is confidential and may be legally privileged. 
> It is intended solely for the addressee. Access to this email by anyone else 
> is unauthorized. If you are not the intended recipient, any disclosure, 
> copying, distribution or any action taken or omitted to be taken in reliance 
> on it, is prohibited and may be unlawful.
> 
> 
> 
> 
> 
> -- 
> Michael Gummelt
> Software Engineer
> Mesosphere
> 
> 
> The information in this email is confidential and may be legally privileged. 
> It is intended solely for the addressee. Access to this email by anyone else 
> is unauthorized. If you are not the intended recipient, any disclosure, 
> copying, distribution or any action taken or omitted to be taken in reliance 
> on it, is prohibited and may be unlawful.
> 
> 
> 
> 
> -- 
> Michael Gummelt
> Software Engineer
> Mesosphere



does persistence required for single action ?

2017-02-07 Thread Shushant Arora
Hi

I have a workflow like below:

rdd1 = sc.textFile(input);
rdd2 = rdd1.filter(filterfunc1);
rdd3 = rdd1.filter(fiterfunc2);
rdd4 = rdd2.map(mapptrans1);
rdd5 = rdd3.map(maptrans2);
rdd6 = rdd4.union(rdd5);
rdd6.foreach(some transformation);

[image: Inline image 1]




   1. Do I need to persist rdd1 ?Or its not required since there is only
   one action at rdd6 which will create only one job and in a single job no
   need of persist ?
   2. Also what if transformation on rdd2 is reduceByKey instead of map ?
   Will this again the same thing no need of persist since single job.


Thanks


Re: Resource Leak in Spark Streaming

2017-02-07 Thread Nipun Arora
I located the issue:

Having the following seems to be necessary in the pool object to make it
serialized:

*private transient *ConcurrentLinkedQueue>
*pool*;

However this means open connections cannot be re-used in subsequent
micro-batches, as transient objects are not persistent. How can we go
around this problem?


Thanks,

Nipun

On Tue, Feb 7, 2017 at 6:35 PM, Nipun Arora 
wrote:

> Ryan,
>
> Apologies for coming back so late, I created a github repo to resolve
> this problem. On trying your solution for making the pool a Singleton,
> I get a null pointer exception in the worker.
> Do you have any other suggestions, or a simpler mechanism for handling
> this?
>
> I have put all the current code which was forked from an existing git repo
> here:
> https://github.com/nipunarora/Spark-Kafka-Writer
>
> There does seem to be duplicate creation of Kafka Writers in every
> micro-batch.
>
> Thanks
> Nipun
>
> P.S the version I shared before was writing JavaDStream, the
> one in the github project writes JavaDStream>
>


Re: Resource Leak in Spark Streaming

2017-02-07 Thread Nipun Arora
Ryan,

Apologies for coming back so late, I created a github repo to resolve
this problem. On trying your solution for making the pool a Singleton,
I get a null pointer exception in the worker.
Do you have any other suggestions, or a simpler mechanism for handling this?

I have put all the current code which was forked from an existing git repo here:
https://github.com/nipunarora/Spark-Kafka-Writer

There does seem to be duplicate creation of Kafka Writers in every micro-batch.

Thanks
Nipun

P.S the version I shared before was writing JavaDStream, the
one in the github project writes JavaDStream>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Fault tolerant broadcast in updateStateByKey

2017-02-07 Thread Amit Sela
I'm updating the Broadcast between batches, but I've ended up doing it in a
listener, thanks!

On Wed, Feb 8, 2017 at 12:31 AM Tathagata Das 
wrote:

> broadcasts are not saved in checkpoints. so you have to save it externally
> yourself, and recover it before restarting the stream from checkpoints.
>
> On Tue, Feb 7, 2017 at 3:55 PM, Amit Sela  wrote:
>
> I know this approach, only thing is, it relies on the transformation being
> an RDD transfomration as well and so could be applied via foreachRDD and
> using the rdd context to avoid a stale context after recovery/resume.
> My question is how to void stale context in a DStream-only transformation
> such as updateStateByKey / mapWithState ?
>
> On Tue, Feb 7, 2017 at 9:19 PM Shixiong(Ryan) Zhu 
> wrote:
>
> It's documented here:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints
>
> On Tue, Feb 7, 2017 at 8:12 AM, Amit Sela  wrote:
>
> Hi all,
>
> I was wondering if anyone ever used a broadcast variable within
> an updateStateByKey op. ? Using it is straight-forward but I was wondering
> how it'll work after resuming from checkpoint (using the rdd.context()
> trick is not possible here) ?
>
> Thanks,
> Amit
>
>
>
>


Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Everett Anderson
On Tue, Feb 7, 2017 at 2:21 PM, Michael Armbrust 
wrote:

> I think the fastest way is likely to use a combination of conditionals
> (when / otherwise), first (ignoring nulls), while grouping by the id.
> This should get the answer with only a single shuffle.
>
> Here is an example
> 
> .
>

Very cool! Using the simpler aggregates feels cleaner.


>
> On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski  wrote:
>
>> Hi Everett,
>>
>> That's pretty much what I'd do. Can't think of a way to beat your
>> solution. Why do you "feel vaguely uneasy about it"?
>>
>
Maybe it felt like I was unnecessarily grouping-by twice, but probably
mostly that I hadn't used pivot before.

Interestingly, the physical plans are not especially different between
these two solutions after the rank column is added. They both have two
SortAggregates that seem to be figuring out where to put results based on
the rank:

My original one:

== Physical Plan ==
*Project [id#279, name#280, 1#372.extra AS extra1#409, 1#372.data AS
data1#435, 1#372.priority AS priority1#462, 2#374.extra AS extra2#490,
2#374.data AS data2#519, 2#374.priority AS priority2#549, 3#376.extra AS
extra3#580, 3#376.data AS data3#612, 3#376.priority AS priority3#645]
+- SortAggregate(key=[id#279,name#280], functions=[first(if ((cast(rank#292
as double) = 1.0)) temp_struct#312 else null, true),first(if
((cast(rank#292 as double) = 2.0)) temp_struct#312 else null,
true),first(if ((cast(rank#292 as double) = 3.0)) temp_struct#312 else
null, true)])
   +- SortAggregate(key=[id#279,name#280], functions=[partial_first(if
((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
true),partial_first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312
else null, true),partial_first(if ((cast(rank#292 as double) = 3.0))
temp_struct#312 else null, true)])
  +- *Project [id#279, name#280, rank#292, struct(extra#281, data#282,
priority#283) AS temp_struct#312]
 +- Window [denserank(priority#283) windowspecdefinition(id#279,
name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW) AS rank#292], [id#279, name#280], [priority#283 ASC]
+- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
   +- Exchange hashpartitioning(id#279, name#280, 200)
  +- Scan
ExistingRDD[id#279,name#280,extra#281,data#282,priority#283]


And modifying Michael's slightly to use a rank:

import org.apache.spark.sql.functions._

def getColumnWithRank(column: String, rank: Int) = {
  first(when(col("rank") === lit(rank), col(column)).otherwise(null),
ignoreNulls = true)
}

val withRankColumn = data.withColumn("rank",
functions.dense_rank().over(Window.partitionBy("id",
"name").orderBy("priority")))

val modCollapsed = withRankColumn
  .groupBy($"id", $"name")
  .agg(
getColumnWithRank("data", 1) as 'data1,
getColumnWithRank("data", 2) as 'data2,
getColumnWithRank("data", 3) as 'data3,
getColumnWithRank("extra", 1) as 'extra1,
getColumnWithRank("extra", 2) as 'extra2,
getColumnWithRank("extra", 3) as 'extra3)


modCollapsed.explain

== Physical Plan ==
SortAggregate(key=[id#279,name#280], functions=[first(CASE WHEN (rank#965 =
1) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 2) THEN
data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 3) THEN data#282
ELSE null END, true),first(CASE WHEN (rank#965 = 1) THEN extra#281 ELSE
null END, true),first(CASE WHEN (rank#965 = 2) THEN extra#281 ELSE null
END, true),first(CASE WHEN (rank#965 = 3) THEN extra#281 ELSE null END,
true)])
+- SortAggregate(key=[id#279,name#280], functions=[partial_first(CASE WHEN
(rank#965 = 1) THEN data#282 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 2) THEN data#282 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 3) THEN data#282 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 1) THEN extra#281 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 2) THEN extra#281 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 3) THEN extra#281 ELSE null END, true)])
   +- *Project [id#279, name#280, extra#281, data#282, rank#965]
  +- Window [denserank(priority#283) windowspecdefinition(id#279,
name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW) AS rank#965], [id#279, name#280], [priority#283 ASC]
 +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
+- Exchange hashpartitioning(id#279, name#280, 200)
   +- Scan
ExistingRDD[id#279,name#280,extra#281,data#282,priority#283]



>
>> I'd also check out the execution plan (with explain) to see how it's
>> gonna work at runtime. I may have seen groupBy + join be better than
>> window (there were more exchanges in play for windows I reckon).
>>
>> Pozdrawiam,
>> Jacek Laskowski

Re: Exception in spark streaming + kafka direct app

2017-02-07 Thread Srikanth
This is running in YARN cluster mode. It was restarted automatically and
continued fine.
I was trying to see what went wrong. AFAIK there were no task failure.
Nothing in executor logs. The log I gave is in driver.

After some digging, I did see that there was a rebalance in kafka logs
around this time. So will driver fail and exit in such cases?
I've seen drivers exit after a job has hit max retry attempts. This is
different though rt?

Srikanth


On Tue, Feb 7, 2017 at 5:25 PM, Tathagata Das 
wrote:

> Does restarting after a few minutes solves the problem? Could be a
> transient issue that lasts long enough for spark task-level retries to all
> fail.
>
> On Tue, Feb 7, 2017 at 4:34 PM, Srikanth  wrote:
>
>> Hello,
>>
>> I had a spark streaming app that reads from kafka running for a few hours
>> after which it failed with error
>>
>> *17/02/07 20:04:10 ERROR JobScheduler: Error generating jobs for time 
>> 148649785 ms
>> java.lang.IllegalStateException: No current assignment for partition 
>> mt_event-5
>>  at 
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
>>  at 
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
>>  at 
>> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
>>  at 
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
>>  at 
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
>>  at 
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>  at 
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)*
>>
>> 
>> 
>>
>> 17/02/07 20:04:10 ERROR ApplicationMaster: User class threw exception: 
>> java.lang.IllegalStateException: No current assignment for partition 
>> mt_event-5
>> java.lang.IllegalStateException: No current assignment for partition 
>> mt_event-5
>>
>> 
>> 
>>
>> 17/02/07 20:05:00 WARN JobGenerator: Timed out while stopping the job 
>> generator (timeout = 5)
>>
>>
>> Driver did not recover from this error and failed. The previous batch ran 
>> 5sec back. There are no indications in the logs that some rebalance happened.
>> As per kafka admin, kafka cluster health was good when this happened and no 
>> maintenance was being done.
>>
>> Any idea what could have gone wrong and why this is a fatal error?
>>
>> Regards,
>> Srikanth
>>
>>
>


Re: [Spark Context]: How to add on demand jobs to an existing spark context?

2017-02-07 Thread Michael Segel
Why couldn’t you use the spark thrift server?


On Feb 7, 2017, at 1:28 PM, Cosmin Posteuca 
> wrote:

answer for Gourav Sengupta

I want to use same spark application because i want to work as a FIFO 
scheduler. My problem is that i have many jobs(not so big) and if i run an 
application for every job my cluster will split resources as a FAIR 
scheduler(it's what i observe, maybe i'm wrong) and exist the possibility to 
create bottleneck effect. The start time isn't a problem for me, because it 
isn't a real-time application.

I need a business solution, that's the reason why i can't use code from github.

Thanks!

2017-02-07 19:55 GMT+02:00 Gourav Sengupta 
>:
Hi,

May I ask the reason for using the same spark application? Is it because of the 
time it takes in order to start a spark context?

On another note you may want to look at the number of contributors in a github 
repo before choosing a solution.


Regards,
Gourav

On Tue, Feb 7, 2017 at 5:26 PM, vincent gromakowski 
> wrote:
Spark jobserver or Livy server are the best options for pure technical API.
If you want to publish business API you will probably have to build you own app 
like the one I wrote a year ago https://github.com/elppc/akka-spark-experiments
It combines Akka actors and a shared Spark context to serve concurrent 
subsecond jobs


2017-02-07 15:28 GMT+01:00 ayan guha 
>:
I think you are loking for livy or spark  jobserver

On Wed, 8 Feb 2017 at 12:37 am, Cosmin Posteuca 
> wrote:

I want to run different jobs on demand with same spark context, but i don't 
know how exactly i can do this.

I try to get current context, but seems it create a new spark context(with new 
executors).

I call spark-submit to add new jobs.

I run code on Amazon EMR(3 instances, 4 core & 16GB ram / instance), with yarn 
as resource manager.

My code:

val sparkContext = SparkContext.getOrCreate()
val content = 1 to 4
val result = sparkContext.parallelize(content, 5)
result.map(value => value.toString).foreach(loop)

def loop(x: String): Unit = {
   for (a <- 1 to 3000) {

   }
}


spark-submit:

spark-submit --executor-cores 1 \
 --executor-memory 1g \
 --driver-memory 1g \
 --master yarn \
 --deploy-mode cluster \
 --conf spark.dynamicAllocation.enabled=true \
 --conf spark.shuffle.service.enabled=true \
 --conf spark.dynamicAllocation.minExecutors=1 \
 --conf spark.dynamicAllocation.maxExecutors=3 \
 --conf spark.dynamicAllocation.initialExecutors=3 \
 --conf spark.executor.instances=3 \


If i run twice spark-submit it create 6 executors, but i want to run all this 
jobs on same spark application.

How can achieve adding jobs to an existing spark application?

I don't understand why SparkContext.getOrCreate() don't get existing spark 
context.


Thanks,

Cosmin P.

--
Best Regards,
Ayan Guha






Re: Fault tolerant broadcast in updateStateByKey

2017-02-07 Thread Tathagata Das
broadcasts are not saved in checkpoints. so you have to save it externally
yourself, and recover it before restarting the stream from checkpoints.

On Tue, Feb 7, 2017 at 3:55 PM, Amit Sela  wrote:

> I know this approach, only thing is, it relies on the transformation being
> an RDD transfomration as well and so could be applied via foreachRDD and
> using the rdd context to avoid a stale context after recovery/resume.
> My question is how to void stale context in a DStream-only transformation
> such as updateStateByKey / mapWithState ?
>
> On Tue, Feb 7, 2017 at 9:19 PM Shixiong(Ryan) Zhu 
> wrote:
>
>> It's documented here: http://spark.apache.org/docs/
>> latest/streaming-programming-guide.html#accumulators-
>> broadcast-variables-and-checkpoints
>>
>> On Tue, Feb 7, 2017 at 8:12 AM, Amit Sela  wrote:
>>
>> Hi all,
>>
>> I was wondering if anyone ever used a broadcast variable within
>> an updateStateByKey op. ? Using it is straight-forward but I was wondering
>> how it'll work after resuming from checkpoint (using the rdd.context()
>> trick is not possible here) ?
>>
>> Thanks,
>> Amit
>>
>>
>>


Re: Exception in spark streaming + kafka direct app

2017-02-07 Thread Tathagata Das
Does restarting after a few minutes solves the problem? Could be a
transient issue that lasts long enough for spark task-level retries to all
fail.

On Tue, Feb 7, 2017 at 4:34 PM, Srikanth  wrote:

> Hello,
>
> I had a spark streaming app that reads from kafka running for a few hours
> after which it failed with error
>
> *17/02/07 20:04:10 ERROR JobScheduler: Error generating jobs for time 
> 148649785 ms
> java.lang.IllegalStateException: No current assignment for partition 
> mt_event-5
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)*
>
> 
> 
>
> 17/02/07 20:04:10 ERROR ApplicationMaster: User class threw exception: 
> java.lang.IllegalStateException: No current assignment for partition 
> mt_event-5
> java.lang.IllegalStateException: No current assignment for partition 
> mt_event-5
>
> 
> 
>
> 17/02/07 20:05:00 WARN JobGenerator: Timed out while stopping the job 
> generator (timeout = 5)
>
>
> Driver did not recover from this error and failed. The previous batch ran 
> 5sec back. There are no indications in the logs that some rebalance happened.
> As per kafka admin, kafka cluster health was good when this happened and no 
> maintenance was being done.
>
> Any idea what could have gone wrong and why this is a fatal error?
>
> Regards,
> Srikanth
>
>


Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Michael Armbrust
I think the fastest way is likely to use a combination of conditionals
(when / otherwise), first (ignoring nulls), while grouping by the id.  This
should get the answer with only a single shuffle.

Here is an example

.

On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski  wrote:

> Hi Everett,
>
> That's pretty much what I'd do. Can't think of a way to beat your
> solution. Why do you "feel vaguely uneasy about it"?
>
> I'd also check out the execution plan (with explain) to see how it's
> gonna work at runtime. I may have seen groupBy + join be better than
> window (there were more exchanges in play for windows I reckon).
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson 
> wrote:
> >
> >
> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski 
> wrote:
> >>
> >> Hi,
> >>
> >> Could groupBy and withColumn or UDAF work perhaps? I think window could
> >> help here too.
> >
> >
> > This seems to work, but I do feel vaguely uneasy about it. :)
> >
> > // First add a 'rank' column which is priority order just in case
> priorities
> > aren't
> > // from 1 with no gaps.
> > val temp1 = data.withColumn("rank", functions.dense_rank()
> >.over(Window.partitionBy("id", "name").orderBy("priority")))
> >
> > +---++-+--+++
> > | id|name|extra|  data|priority|rank|
> > +---++-+--+++
> > |  1|Fred|8|value1|   1|   1|
> > |  1|Fred|8|value8|   2|   2|
> > |  1|Fred|8|value5|   3|   3|
> > |  2| Amy|9|value3|   1|   1|
> > |  2| Amy|9|value5|   2|   2|
> > +---++-+--+++
> >
> > // Now move all the columns we want to denormalize into a struct column
> to
> > keep them together.
> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
> > temp1("data"), temp1("priority")))
> >   .drop("extra", "data", "priority")
> >
> > +---++++
> > | id|name|rank| temp_struct|
> > +---++++
> > |  1|Fred|   1|[8,value1,1]|
> > |  1|Fred|   2|[8,value8,2]|
> > |  1|Fred|   3|[8,value5,3]|
> > |  2| Amy|   1|[9,value3,1]|
> > |  2| Amy|   2|[9,value5,2]|
> > +---++++
> >
> > // groupBy, again, but now pivot the rank column. We need an aggregate
> > function after pivot,
> > // so use first -- there will only ever be one element.
> > val temp3 = temp2.groupBy("id", "name")
> >   .pivot("rank", Seq("1", "2", "3"))
> >   .agg(functions.first("temp_struct"))
> >
> > +---+++++
> > | id|name|   1|   2|   3|
> > +---+++++
> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
> > |  2| Amy|[9,value3,1]|[9,value5,2]|null|
> > +---+++++
> >
> > // Now just moving things out of the structs and clean up.
> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
> >  .withColumn("data1", temp3("1").getField("data"))
> >  .withColumn("priority1", temp3("1").getField("priority"))
> >  .withColumn("extra2", temp3("2").getField("extra"))
> >  .withColumn("data2", temp3("2").getField("data"))
> >  .withColumn("priority2", temp3("2").getField("priority"))
> >  .withColumn("extra3", temp3("3").getField("extra"))
> >  .withColumn("data3", temp3("3").getField("data"))
> >  .withColumn("priority3", temp3("3").getField("priority"))
> >  .drop("1", "2", "3")
> >
> > +---++--+--+-+--+--+-+--
> +--+-+
> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
> > data3|priority3|
> > +---++--+--+-+--+--+-+--
> +--+-+
> > |  1|Fred| 8|value1|1| 8|value8|2| 8|value5|
> > 3|
> > |  2| Amy| 9|value3|1| 9|value5|2|  null|  null|
> > null|
> > +---++--+--+-+--+--+-+--
> +--+-+
> >
> >
> >
> >
> >
> >
> >
> >>
> >>
> >> Jacek
> >>
> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" 
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm trying to un-explode or denormalize a table like
> >>>
> >>> +---++-+--++
> >>> |id |name|extra|data  |priority|
> >>> +---++-+--++
> >>> |1  |Fred|8|value1|1   |
> >>> |1  |Fred|8|value8|2   |
> >>> |1  |Fred|8|value5|3   |
> >>> |2  |Amy |9|value3|1   |
> >>> |2  |Amy |9|value5|2   |
> >>> +---++-+--++
> >>>
> >>> into something 

Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Jacek Laskowski
Hi Everett,

That's pretty much what I'd do. Can't think of a way to beat your
solution. Why do you "feel vaguely uneasy about it"?

I'd also check out the execution plan (with explain) to see how it's
gonna work at runtime. I may have seen groupBy + join be better than
window (there were more exchanges in play for windows I reckon).

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson  wrote:
>
>
> On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> Could groupBy and withColumn or UDAF work perhaps? I think window could
>> help here too.
>
>
> This seems to work, but I do feel vaguely uneasy about it. :)
>
> // First add a 'rank' column which is priority order just in case priorities
> aren't
> // from 1 with no gaps.
> val temp1 = data.withColumn("rank", functions.dense_rank()
>.over(Window.partitionBy("id", "name").orderBy("priority")))
>
> +---++-+--+++
> | id|name|extra|  data|priority|rank|
> +---++-+--+++
> |  1|Fred|8|value1|   1|   1|
> |  1|Fred|8|value8|   2|   2|
> |  1|Fred|8|value5|   3|   3|
> |  2| Amy|9|value3|   1|   1|
> |  2| Amy|9|value5|   2|   2|
> +---++-+--+++
>
> // Now move all the columns we want to denormalize into a struct column to
> keep them together.
> val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
> temp1("data"), temp1("priority")))
>   .drop("extra", "data", "priority")
>
> +---++++
> | id|name|rank| temp_struct|
> +---++++
> |  1|Fred|   1|[8,value1,1]|
> |  1|Fred|   2|[8,value8,2]|
> |  1|Fred|   3|[8,value5,3]|
> |  2| Amy|   1|[9,value3,1]|
> |  2| Amy|   2|[9,value5,2]|
> +---++++
>
> // groupBy, again, but now pivot the rank column. We need an aggregate
> function after pivot,
> // so use first -- there will only ever be one element.
> val temp3 = temp2.groupBy("id", "name")
>   .pivot("rank", Seq("1", "2", "3"))
>   .agg(functions.first("temp_struct"))
>
> +---+++++
> | id|name|   1|   2|   3|
> +---+++++
> |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
> |  2| Amy|[9,value3,1]|[9,value5,2]|null|
> +---+++++
>
> // Now just moving things out of the structs and clean up.
> val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
>  .withColumn("data1", temp3("1").getField("data"))
>  .withColumn("priority1", temp3("1").getField("priority"))
>  .withColumn("extra2", temp3("2").getField("extra"))
>  .withColumn("data2", temp3("2").getField("data"))
>  .withColumn("priority2", temp3("2").getField("priority"))
>  .withColumn("extra3", temp3("3").getField("extra"))
>  .withColumn("data3", temp3("3").getField("data"))
>  .withColumn("priority3", temp3("3").getField("priority"))
>  .drop("1", "2", "3")
>
> +---++--+--+-+--+--+-+--+--+-+
> | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
> data3|priority3|
> +---++--+--+-+--+--+-+--+--+-+
> |  1|Fred| 8|value1|1| 8|value8|2| 8|value5|
> 3|
> |  2| Amy| 9|value3|1| 9|value5|2|  null|  null|
> null|
> +---++--+--+-+--+--+-+--+--+-+
>
>
>
>
>
>
>
>>
>>
>> Jacek
>>
>> On 7 Feb 2017 8:02 p.m., "Everett Anderson" 
>> wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to un-explode or denormalize a table like
>>>
>>> +---++-+--++
>>> |id |name|extra|data  |priority|
>>> +---++-+--++
>>> |1  |Fred|8|value1|1   |
>>> |1  |Fred|8|value8|2   |
>>> |1  |Fred|8|value5|3   |
>>> |2  |Amy |9|value3|1   |
>>> |2  |Amy |9|value5|2   |
>>> +---++-+--++
>>>
>>> into something that looks like
>>>
>>>
>>> +---++--+--+-+--+--+-+--+--+-+
>>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
>>> |priority3|
>>>
>>> +---++--+--+-+--+--+-+--+--+-+
>>> |1  |Fred|8 |value1|1|8 |value8|2|8 |value5|3
>>> |
>>> |2  |Amy |9 |value3|1|9 |value5|2|null  |null
>>> |null |
>>>
>>> +---++--+--+-+--+--+-+--+--+-+
>>>
>>> If I were going the other direction, I'd create a new column with an
>>> array of structs, each with 'extra', 'data', and 'priority' fields and then
>>> 

Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Everett Anderson
On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski  wrote:

> Hi,
>
> Could groupBy and withColumn or UDAF work perhaps? I think window could
> help here too.
>

This seems to work, but I do feel vaguely uneasy about it. :)

// First add a 'rank' column which is priority order just in case
priorities aren't
// from 1 with no gaps.
val temp1 = data.withColumn("rank", functions.dense_rank()
   .over(Window.partitionBy("id", "name").orderBy("priority")))

+---++-+--+++
| id|name|extra|  data|priority|rank|
+---++-+--+++
|  1|Fred|8|value1|   1|   1|
|  1|Fred|8|value8|   2|   2|
|  1|Fred|8|value5|   3|   3|
|  2| Amy|9|value3|   1|   1|
|  2| Amy|9|value5|   2|   2|
+---++-+--+++

// Now move all the columns we want to denormalize into a struct column to
keep them together.
val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
temp1("data"), temp1("priority")))
  .drop("extra", "data", "priority")

+---++++
| id|name|rank| temp_struct|
+---++++
|  1|Fred|   1|[8,value1,1]|
|  1|Fred|   2|[8,value8,2]|
|  1|Fred|   3|[8,value5,3]|
|  2| Amy|   1|[9,value3,1]|
|  2| Amy|   2|[9,value5,2]|
+---++++

// groupBy, again, but now pivot the rank column. We need an aggregate
function after pivot,
// so use first -- there will only ever be one element.
val temp3 = temp2.groupBy("id", "name")
  .pivot("rank", Seq("1", "2", "3"))
  .agg(functions.first("temp_struct"))

+---+++++
| id|name|   1|   2|   3|
+---+++++
|  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
|  2| Amy|[9,value3,1]|[9,value5,2]|null|
+---+++++

// Now just moving things out of the structs and clean up.
val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
 .withColumn("data1", temp3("1").getField("data"))
 .withColumn("priority1", temp3("1").getField("priority"))
 .withColumn("extra2", temp3("2").getField("extra"))
 .withColumn("data2", temp3("2").getField("data"))
 .withColumn("priority2", temp3("2").getField("priority"))
 .withColumn("extra3", temp3("3").getField("extra"))
 .withColumn("data3", temp3("3").getField("data"))
 .withColumn("priority3", temp3("3").getField("priority"))
 .drop("1", "2", "3")

+---++--+--+-+--+--+-+--+--+-+
| id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
data3|priority3|
+---++--+--+-+--+--+-+--+--+-+
|  1|Fred| 8|value1|1| 8|value8|2| 8|value5|
 3|
|  2| Amy| 9|value3|1| 9|value5|2|  null|  null|
  null|
+---++--+--+-+--+--+-+--+--+-+








>
> Jacek
>
> On 7 Feb 2017 8:02 p.m., "Everett Anderson" 
> wrote:
>
>> Hi,
>>
>> I'm trying to un-explode or denormalize a table like
>>
>> +---++-+--++
>> |id |name|extra|data  |priority|
>> +---++-+--++
>> |1  |Fred|8|value1|1   |
>> |1  |Fred|8|value8|2   |
>> |1  |Fred|8|value5|3   |
>> |2  |Amy |9|value3|1   |
>> |2  |Amy |9|value5|2   |
>> +---++-+--++
>>
>> into something that looks like
>>
>> +---++--+--+-+--+--+-+--
>> +--+-+
>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
>> |priority3|
>> +---++--+--+-+--+--+-+--
>> +--+-+
>> |1  |Fred|8 |value1|1|8 |value8|2|8 |value5|3
>>|
>> |2  |Amy |9 |value3|1|9 |value5|2|null  |null
>>  |null |
>> +---++--+--+-+--+--+-+--
>> +--+-+
>>
>> If I were going the other direction, I'd create a new column with an
>> array of structs, each with 'extra', 'data', and 'priority' fields and then
>> explode it.
>>
>> Going from the more normalized view, though, I'm having a harder time.
>>
>> I want to group or partition by (id, name) and order by priority, but
>> after that I can't figure out how to get multiple rows rotated into one.
>>
>> Any ideas?
>>
>> Here's the code to create the input table above:
>>
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.Dataset
>> import org.apache.spark.sql.types._
>>
>> val rowsRDD = sc.parallelize(Seq(
>> Row(1, "Fred", 8, "value1", 1),
>> Row(1, "Fred", 8, "value8", 2),
>> Row(1, "Fred", 8, "value5", 3),
>> Row(2, "Amy", 9, "value3", 1),
>> Row(2, "Amy", 9, "value5", 2)))
>>
>> val schema = StructType(Seq(
>> StructField("id", IntegerType, nullable = true),
>> 

Exception in spark streaming + kafka direct app

2017-02-07 Thread Srikanth
Hello,

I had a spark streaming app that reads from kafka running for a few hours
after which it failed with error

*17/02/07 20:04:10 ERROR JobScheduler: Error generating jobs for time
148649785 ms
java.lang.IllegalStateException: No current assignment for partition mt_event-5
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)*




17/02/07 20:04:10 ERROR ApplicationMaster: User class threw exception:
java.lang.IllegalStateException: No current assignment for partition
mt_event-5
java.lang.IllegalStateException: No current assignment for partition mt_event-5




17/02/07 20:05:00 WARN JobGenerator: Timed out while stopping the job
generator (timeout = 5)


Driver did not recover from this error and failed. The previous batch
ran 5sec back. There are no indications in the logs that some
rebalance happened.
As per kafka admin, kafka cluster health was good when this happened
and no maintenance was being done.

Any idea what could have gone wrong and why this is a fatal error?

Regards,
Srikanth


Re: submit a spark code on google cloud

2017-02-07 Thread Dinko Srkoč
Getting to the Spark web UI when Spark is running on Dataproc is not
that straightforward. Connecting to that web interface is a two step
process:

1. create an SSH tunnel
2. configure the browser to use a SOCKS proxy to connect

The above steps are described here:
https://cloud.google.com/dataproc/docs/concepts/cluster-web-interfaces

Once you have your browser configured and running, go to the
http://:4040 for the Spark web UI and
http://:18080 for Spark's history server.

 is the name of the cluster with "-m" appendage. So,
if the cluster name is "mycluster", master will be called
"mycluster-m".

Cheers,
Dinko

On 7 February 2017 at 21:41, Jacek Laskowski  wrote:
> Hi,
>
> I know nothing about Spark in GCP so answering this for a pure Spark.
>
> Can you use web UI and Executors tab or a SparkListener?
>
> Jacek
>
> On 7 Feb 2017 5:33 p.m., "Anahita Talebi"  wrote:
>
> Hello Friends,
>
> I am trying to run a spark code on multiple machines. To this aim, I submit
> a spark code on submit job on google cloud platform.
> https://cloud.google.com/dataproc/docs/guides/submit-job
>
> I have created a cluster with 6 nodes. Does anyone know how I can realize
> which nodes are participated when I run the code on the cluster?
>
> Thanks a lot,
> Anahita
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Fault tolerant broadcast in updateStateByKey

2017-02-07 Thread Amit Sela
I know this approach, only thing is, it relies on the transformation being
an RDD transfomration as well and so could be applied via foreachRDD and
using the rdd context to avoid a stale context after recovery/resume.
My question is how to void stale context in a DStream-only transformation
such as updateStateByKey / mapWithState ?

On Tue, Feb 7, 2017 at 9:19 PM Shixiong(Ryan) Zhu 
wrote:

> It's documented here:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints
>
> On Tue, Feb 7, 2017 at 8:12 AM, Amit Sela  wrote:
>
> Hi all,
>
> I was wondering if anyone ever used a broadcast variable within
> an updateStateByKey op. ? Using it is straight-forward but I was wondering
> how it'll work after resuming from checkpoint (using the rdd.context()
> trick is not possible here) ?
>
> Thanks,
> Amit
>
>
>


Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Jacek Laskowski
Hi,

Could groupBy and withColumn or UDAF work perhaps? I think window could
help here too.

Jacek

On 7 Feb 2017 8:02 p.m., "Everett Anderson" 
wrote:

> Hi,
>
> I'm trying to un-explode or denormalize a table like
>
> +---++-+--++
> |id |name|extra|data  |priority|
> +---++-+--++
> |1  |Fred|8|value1|1   |
> |1  |Fred|8|value8|2   |
> |1  |Fred|8|value5|3   |
> |2  |Amy |9|value3|1   |
> |2  |Amy |9|value5|2   |
> +---++-+--++
>
> into something that looks like
>
> +---++--+--+-+--+--+-+--
> +--+-+
> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
> |priority3|
> +---++--+--+-+--+--+-+--
> +--+-+
> |1  |Fred|8 |value1|1|8 |value8|2|8 |value5|3
>|
> |2  |Amy |9 |value3|1|9 |value5|2|null  |null
>  |null |
> +---++--+--+-+--+--+-+--
> +--+-+
>
> If I were going the other direction, I'd create a new column with an array
> of structs, each with 'extra', 'data', and 'priority' fields and then
> explode it.
>
> Going from the more normalized view, though, I'm having a harder time.
>
> I want to group or partition by (id, name) and order by priority, but
> after that I can't figure out how to get multiple rows rotated into one.
>
> Any ideas?
>
> Here's the code to create the input table above:
>
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
>
> val rowsRDD = sc.parallelize(Seq(
> Row(1, "Fred", 8, "value1", 1),
> Row(1, "Fred", 8, "value8", 2),
> Row(1, "Fred", 8, "value5", 3),
> Row(2, "Amy", 9, "value3", 1),
> Row(2, "Amy", 9, "value5", 2)))
>
> val schema = StructType(Seq(
> StructField("id", IntegerType, nullable = true),
> StructField("name", StringType, nullable = true),
> StructField("extra", IntegerType, nullable = true),
> StructField("data", StringType, nullable = true),
> StructField("priority", IntegerType, nullable = true)))
>
> val data = sqlContext.createDataFrame(rowsRDD, schema)
>
>
>
>


Re: submit a spark code on google cloud

2017-02-07 Thread Jacek Laskowski
Hi,

I know nothing about Spark in GCP so answering this for a pure Spark.

Can you use web UI and Executors tab or a SparkListener?

Jacek

On 7 Feb 2017 5:33 p.m., "Anahita Talebi"  wrote:

Hello Friends,

I am trying to run a spark code on multiple machines. To this aim, I submit
a spark code on submit job on google cloud platform.
https://cloud.google.com/dataproc/docs/guides/submit-job

I have created a cluster with 6 nodes. Does anyone know how I can realize
which nodes are participated when I run the code on the cluster?

Thanks a lot,
Anahita


Re: How to get a spark sql statement implement duration ?

2017-02-07 Thread Jacek Laskowski
On 7 Feb 2017 4:17 a.m., "Mars Xu"  wrote:

Hello All,

Some spark sqls will produce one or more jobs, I have 2 questions,

1, How the cc.sql(“sql statement”) divided into one or more jobs ?


It's an implementation detail. You can have zero or more jobs for a single
structured query (query DSL or SQL).

2, When I execute spark sql query in spark - shell client, how to
get the execution time (Spark  2.1.0) ?  if a sql query produced 3 jobs, In
my opinion, the execution time is to sum up the 3 jobs’ duration time.


Yes. What's the question then?

Jacek


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-07 Thread Jacek Laskowski
Hi,

Have you considered foreach sink?

Jacek

On 6 Feb 2017 8:39 p.m., "Egor Pahomov"  wrote:

> Hi, I'm thinking of using Structured Streaming instead of old streaming,
> but I need to be able to save results to Hive table. Documentation for file
> sink says(http://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#output-sinks): "Supports writes to
> partitioned tables. ". But being able to write to partitioned directories
> is not enough to write to the table: someone needs to write to Hive
> metastore. How can I use Structured Streaming and write to Hive table?
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>


Re: [Spark Context]: How to add on demand jobs to an existing spark context?

2017-02-07 Thread Cosmin Posteuca
answer for Gourav Sengupta

I want to use same spark application because i want to work as a FIFO
scheduler. My problem is that i have many jobs(not so big) and if i run an
application for every job my cluster will split resources as a FAIR
scheduler(it's what i observe, maybe i'm wrong) and exist the possibility
to create bottleneck effect. The start time isn't a problem for me, because
it isn't a real-time application.

I need a business solution, that's the reason why i can't use code from
github.

Thanks!

2017-02-07 19:55 GMT+02:00 Gourav Sengupta :

> Hi,
>
> May I ask the reason for using the same spark application? Is it because
> of the time it takes in order to start a spark context?
>
> On another note you may want to look at the number of contributors in a
> github repo before choosing a solution.
>
>
> Regards,
> Gourav
>
> On Tue, Feb 7, 2017 at 5:26 PM, vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> Spark jobserver or Livy server are the best options for pure technical
>> API.
>> If you want to publish business API you will probably have to build you
>> own app like the one I wrote a year ago https://github.com/elppc/akka-
>> spark-experiments
>> It combines Akka actors and a shared Spark context to serve concurrent
>> subsecond jobs
>>
>>
>> 2017-02-07 15:28 GMT+01:00 ayan guha :
>>
>>> I think you are loking for livy or spark  jobserver
>>>
>>> On Wed, 8 Feb 2017 at 12:37 am, Cosmin Posteuca <
>>> cosmin.poste...@gmail.com> wrote:
>>>
 I want to run different jobs on demand with same spark context, but i
 don't know how exactly i can do this.

 I try to get current context, but seems it create a new spark
 context(with new executors).

 I call spark-submit to add new jobs.

 I run code on Amazon EMR(3 instances, 4 core & 16GB ram / instance),
 with yarn as resource manager.

 My code:

 val sparkContext = SparkContext.getOrCreate()
 val content = 1 to 4
 val result = sparkContext.parallelize(content, 5)
 result.map(value => value.toString).foreach(loop)

 def loop(x: String): Unit = {
for (a <- 1 to 3000) {

}
 }

 spark-submit:

 spark-submit --executor-cores 1 \
  --executor-memory 1g \
  --driver-memory 1g \
  --master yarn \
  --deploy-mode cluster \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.shuffle.service.enabled=true \
  --conf spark.dynamicAllocation.minExecutors=1 \
  --conf spark.dynamicAllocation.maxExecutors=3 \
  --conf spark.dynamicAllocation.initialExecutors=3 \
  --conf spark.executor.instances=3 \

 If i run twice spark-submit it create 6 executors, but i want to run
 all this jobs on same spark application.

 How can achieve adding jobs to an existing spark application?

 I don't understand why SparkContext.getOrCreate() don't get existing
 spark context.


 Thanks,

 Cosmin P.

>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>


Re: Fault tolerant broadcast in updateStateByKey

2017-02-07 Thread Shixiong(Ryan) Zhu
It's documented here:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints

On Tue, Feb 7, 2017 at 8:12 AM, Amit Sela  wrote:

> Hi all,
>
> I was wondering if anyone ever used a broadcast variable within
> an updateStateByKey op. ? Using it is straight-forward but I was wondering
> how it'll work after resuming from checkpoint (using the rdd.context()
> trick is not possible here) ?
>
> Thanks,
> Amit
>


Re: [Spark Context]: How to add on demand jobs to an existing spark context?

2017-02-07 Thread Cosmin Posteuca
Response for vincent:

Thanks for answer!

Yes, i need a business solution, that's the reason why i can't use Spark
jobserver or Livy solutions. I will look on your github to see how to build
such a system.

But i don't understand, why spark doesn't have a solution for this kind of
problem? and why can't i get the existing context and run some code on it?

Thanks

2017-02-07 19:26 GMT+02:00 vincent gromakowski <
vincent.gromakow...@gmail.com>:

> Spark jobserver or Livy server are the best options for pure technical API.
> If you want to publish business API you will probably have to build you
> own app like the one I wrote a year ago https://github.com/elppc/akka-
> spark-experiments
> It combines Akka actors and a shared Spark context to serve concurrent
> subsecond jobs
>
>
> 2017-02-07 15:28 GMT+01:00 ayan guha :
>
>> I think you are loking for livy or spark  jobserver
>>
>> On Wed, 8 Feb 2017 at 12:37 am, Cosmin Posteuca <
>> cosmin.poste...@gmail.com> wrote:
>>
>>> I want to run different jobs on demand with same spark context, but i
>>> don't know how exactly i can do this.
>>>
>>> I try to get current context, but seems it create a new spark
>>> context(with new executors).
>>>
>>> I call spark-submit to add new jobs.
>>>
>>> I run code on Amazon EMR(3 instances, 4 core & 16GB ram / instance),
>>> with yarn as resource manager.
>>>
>>> My code:
>>>
>>> val sparkContext = SparkContext.getOrCreate()
>>> val content = 1 to 4
>>> val result = sparkContext.parallelize(content, 5)
>>> result.map(value => value.toString).foreach(loop)
>>>
>>> def loop(x: String): Unit = {
>>>for (a <- 1 to 3000) {
>>>
>>>}
>>> }
>>>
>>> spark-submit:
>>>
>>> spark-submit --executor-cores 1 \
>>>  --executor-memory 1g \
>>>  --driver-memory 1g \
>>>  --master yarn \
>>>  --deploy-mode cluster \
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.shuffle.service.enabled=true \
>>>  --conf spark.dynamicAllocation.minExecutors=1 \
>>>  --conf spark.dynamicAllocation.maxExecutors=3 \
>>>  --conf spark.dynamicAllocation.initialExecutors=3 \
>>>  --conf spark.executor.instances=3 \
>>>
>>> If i run twice spark-submit it create 6 executors, but i want to run all
>>> this jobs on same spark application.
>>>
>>> How can achieve adding jobs to an existing spark application?
>>>
>>> I don't understand why SparkContext.getOrCreate() don't get existing
>>> spark context.
>>>
>>>
>>> Thanks,
>>>
>>> Cosmin P.
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: NoNodeAvailableException (None of the configured nodes are available) error when trying to push data to Elastic from a Spark job

2017-02-07 Thread Jacek Laskowski
Hi,

I may have seen this issue already...

What's the cluster manager? How do you spark-submit?

Jacek

On 7 Feb 2017 7:44 p.m., "dgoldenberg"  wrote:

Hi,

Any reason why we might be getting this error? The code seems to work fine
in the non-distributed mode but the same code when run from a Spark job is
not able to get to Elastic.

Spark version: 2.0.1 built for Hadoop 2.4, Scala 2.11
Elastic version: 2.3.1

I've verified the Elastic hosts and the cluster name.

The spot in the code where this happens is:

ClusterHealthResponse clusterHealthResponse = client.admin().cluster()
.prepareHealth()
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(10))
.get();

Stack trace:

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
DAGScheduler.scala:1442)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
DAGScheduler.scala:1441)
at
scala.collection.mutable.ResizableArray$class.foreach(
ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$
handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$
handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
DAGScheduler.scala:811)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
doOnReceive(DAGScheduler.scala:1667)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
onReceive(DAGScheduler.scala:1622)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:902)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:900)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(
RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(
RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:900)
at
org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.
scala:218)
at
org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.
scala:45)
at com.myco.MyDriver$3.call(com.myco.MyDriver.java:214)
at com.myco.MyDriver$3.call(KafkaSparkStreamingDriver.java:201)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$
anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$
anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$
foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$
foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.ForEachDStream$$
anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$
anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$
anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(
DStream.scala:415)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(
ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(
ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(
ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.
apply$mcV$sp(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.
apply(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.
apply(JobScheduler.scala:247)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$
JobHandler.run(JobScheduler.scala:246)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at

Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Everett Anderson
Hi,

I'm trying to un-explode or denormalize a table like

+---++-+--++
|id |name|extra|data  |priority|
+---++-+--++
|1  |Fred|8|value1|1   |
|1  |Fred|8|value8|2   |
|1  |Fred|8|value5|3   |
|2  |Amy |9|value3|1   |
|2  |Amy |9|value5|2   |
+---++-+--++

into something that looks like

+---++--+--+-+--+--+-+--+--+-+
|id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
|priority3|
+---++--+--+-+--+--+-+--+--+-+
|1  |Fred|8 |value1|1|8 |value8|2|8 |value5|3
 |
|2  |Amy |9 |value3|1|9 |value5|2|null  |null
 |null |
+---++--+--+-+--+--+-+--+--+-+

If I were going the other direction, I'd create a new column with an array
of structs, each with 'extra', 'data', and 'priority' fields and then
explode it.

Going from the more normalized view, though, I'm having a harder time.

I want to group or partition by (id, name) and order by priority, but after
that I can't figure out how to get multiple rows rotated into one.

Any ideas?

Here's the code to create the input table above:

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._

val rowsRDD = sc.parallelize(Seq(
Row(1, "Fred", 8, "value1", 1),
Row(1, "Fred", 8, "value8", 2),
Row(1, "Fred", 8, "value5", 3),
Row(2, "Amy", 9, "value3", 1),
Row(2, "Amy", 9, "value5", 2)))

val schema = StructType(Seq(
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("extra", IntegerType, nullable = true),
StructField("data", StringType, nullable = true),
StructField("priority", IntegerType, nullable = true)))

val data = sqlContext.createDataFrame(rowsRDD, schema)


Re: Spark streaming question - SPARK-13758 Need to use an external RDD inside DStream processing...Please help

2017-02-07 Thread Shixiong(Ryan) Zhu
You can create lazily instantiated singleton instances. See
http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints
for examples of accumulators and broadcast variables. You can use the same
approach to create your cached RDD.

On Tue, Feb 7, 2017 at 10:45 AM, shyla deshpande 
wrote:

> and my cached RDD is not small. If it was maybe I could materialize and
> broadcast.
>
> Thanks
>
> On Tue, Feb 7, 2017 at 10:28 AM, shyla deshpande  > wrote:
>
>> I have a situation similar to the following and I get SPARK-13758 
>> .
>>
>>
>> I understand why I get this error, but I want to know what should be the 
>> approach in dealing with these situations.
>>
>>
>> Thanks
>>
>>
>> > var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
>> > val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
>> > words.foreachRDD((rdd: RDD[String]) => {
>> >   val res = rdd.map(word => (word, word.length)).collect()
>> >   println("words: " + res.mkString(", "))
>> >   cached = cached.union(rdd)
>> >   cached.checkpoint()
>> >   println("cached words: " + cached.collect.mkString(", "))
>> > })
>>
>>
>


Re: Spark streaming question - SPARK-13758 Need to use an external RDD inside DStream processing...Please help

2017-02-07 Thread shyla deshpande
and my cached RDD is not small. If it was maybe I could materialize and
broadcast.

Thanks

On Tue, Feb 7, 2017 at 10:28 AM, shyla deshpande 
wrote:

> I have a situation similar to the following and I get SPARK-13758 
> .
>
>
> I understand why I get this error, but I want to know what should be the 
> approach in dealing with these situations.
>
>
> Thanks
>
>
> > var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
> > val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
> > words.foreachRDD((rdd: RDD[String]) => {
> >   val res = rdd.map(word => (word, word.length)).collect()
> >   println("words: " + res.mkString(", "))
> >   cached = cached.union(rdd)
> >   cached.checkpoint()
> >   println("cached words: " + cached.collect.mkString(", "))
> > })
>
>


NoNodeAvailableException (None of the configured nodes are available) error when trying to push data to Elastic from a Spark job

2017-02-07 Thread dgoldenberg
Hi,

Any reason why we might be getting this error? The code seems to work fine
in the non-distributed mode but the same code when run from a Spark job is
not able to get to Elastic.

Spark version: 2.0.1 built for Hadoop 2.4, Scala 2.11
Elastic version: 2.3.1

I've verified the Elastic hosts and the cluster name.

The spot in the code where this happens is:

ClusterHealthResponse clusterHealthResponse = client.admin().cluster()
.prepareHealth()
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(10))
.get();

Stack trace:

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:902)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:900)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:900)
at
org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:218)
at
org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45)
at com.myco.MyDriver$3.call(com.myco.MyDriver.java:214)
at com.myco.MyDriver$3.call(KafkaSparkStreamingDriver.java:201)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: NoNodeAvailableException[None of the configured nodes are
available: 

Re: Launching an Spark application in a subset of machines

2017-02-07 Thread Michael Gummelt
> Looking into Mesos attributes this seems the perfect fit for it. Is that
correct?

Yes.

On Tue, Feb 7, 2017 at 3:43 AM, Muhammad Asif Abbasi 
wrote:

> YARN provides the concept of node labels. You should explore the
> "spark.yarn.executor.nodeLabelConfiguration" property.
>
>
> Cheers,
> Asif Abbasi
>
> On Tue, 7 Feb 2017 at 10:21, Alvaro Brandon 
> wrote:
>
>> Hello all:
>>
>> I have the following scenario.
>> - I have a cluster of 50 machines with Hadoop and Spark installed on
>> them.
>> - I want to launch one Spark application through spark submit. However I
>> want this application to run on only a subset of these machines,
>> disregarding data locality. (e.g. 10 machines)
>>
>> Is this possible?. Is there any option in the standalone scheduler, YARN
>> or Mesos that allows such thing?.
>>
>>
>>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Spark streaming question - SPARK-13758 Need to use an external RDD inside DStream processing...Please help

2017-02-07 Thread shyla deshpande
I have a situation similar to the following and I get SPARK-13758
.


I understand why I get this error, but I want to know what should be
the approach in dealing with these situations.


Thanks


> var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
> val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
> words.foreachRDD((rdd: RDD[String]) => {
>   val res = rdd.map(word => (word, word.length)).collect()
>   println("words: " + res.mkString(", "))
>   cached = cached.union(rdd)
>   cached.checkpoint()
>   println("cached words: " + cached.collect.mkString(", "))
> })


Re: [Spark Context]: How to add on demand jobs to an existing spark context?

2017-02-07 Thread Gourav Sengupta
Hi,

May I ask the reason for using the same spark application? Is it because of
the time it takes in order to start a spark context?

On another note you may want to look at the number of contributors in a
github repo before choosing a solution.


Regards,
Gourav

On Tue, Feb 7, 2017 at 5:26 PM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Spark jobserver or Livy server are the best options for pure technical API.
> If you want to publish business API you will probably have to build you
> own app like the one I wrote a year ago https://github.com/elppc/akka-
> spark-experiments
> It combines Akka actors and a shared Spark context to serve concurrent
> subsecond jobs
>
>
> 2017-02-07 15:28 GMT+01:00 ayan guha :
>
>> I think you are loking for livy or spark  jobserver
>>
>> On Wed, 8 Feb 2017 at 12:37 am, Cosmin Posteuca <
>> cosmin.poste...@gmail.com> wrote:
>>
>>> I want to run different jobs on demand with same spark context, but i
>>> don't know how exactly i can do this.
>>>
>>> I try to get current context, but seems it create a new spark
>>> context(with new executors).
>>>
>>> I call spark-submit to add new jobs.
>>>
>>> I run code on Amazon EMR(3 instances, 4 core & 16GB ram / instance),
>>> with yarn as resource manager.
>>>
>>> My code:
>>>
>>> val sparkContext = SparkContext.getOrCreate()
>>> val content = 1 to 4
>>> val result = sparkContext.parallelize(content, 5)
>>> result.map(value => value.toString).foreach(loop)
>>>
>>> def loop(x: String): Unit = {
>>>for (a <- 1 to 3000) {
>>>
>>>}
>>> }
>>>
>>> spark-submit:
>>>
>>> spark-submit --executor-cores 1 \
>>>  --executor-memory 1g \
>>>  --driver-memory 1g \
>>>  --master yarn \
>>>  --deploy-mode cluster \
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.shuffle.service.enabled=true \
>>>  --conf spark.dynamicAllocation.minExecutors=1 \
>>>  --conf spark.dynamicAllocation.maxExecutors=3 \
>>>  --conf spark.dynamicAllocation.initialExecutors=3 \
>>>  --conf spark.executor.instances=3 \
>>>
>>> If i run twice spark-submit it create 6 executors, but i want to run all
>>> this jobs on same spark application.
>>>
>>> How can achieve adding jobs to an existing spark application?
>>>
>>> I don't understand why SparkContext.getOrCreate() don't get existing
>>> spark context.
>>>
>>>
>>> Thanks,
>>>
>>> Cosmin P.
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: [Spark Context]: How to add on demand jobs to an existing spark context?

2017-02-07 Thread vincent gromakowski
Spark jobserver or Livy server are the best options for pure technical API.
If you want to publish business API you will probably have to build you own
app like the one I wrote a year ago
https://github.com/elppc/akka-spark-experiments
It combines Akka actors and a shared Spark context to serve concurrent
subsecond jobs


2017-02-07 15:28 GMT+01:00 ayan guha :

> I think you are loking for livy or spark  jobserver
>
> On Wed, 8 Feb 2017 at 12:37 am, Cosmin Posteuca 
> wrote:
>
>> I want to run different jobs on demand with same spark context, but i
>> don't know how exactly i can do this.
>>
>> I try to get current context, but seems it create a new spark
>> context(with new executors).
>>
>> I call spark-submit to add new jobs.
>>
>> I run code on Amazon EMR(3 instances, 4 core & 16GB ram / instance), with
>> yarn as resource manager.
>>
>> My code:
>>
>> val sparkContext = SparkContext.getOrCreate()
>> val content = 1 to 4
>> val result = sparkContext.parallelize(content, 5)
>> result.map(value => value.toString).foreach(loop)
>>
>> def loop(x: String): Unit = {
>>for (a <- 1 to 3000) {
>>
>>}
>> }
>>
>> spark-submit:
>>
>> spark-submit --executor-cores 1 \
>>  --executor-memory 1g \
>>  --driver-memory 1g \
>>  --master yarn \
>>  --deploy-mode cluster \
>>  --conf spark.dynamicAllocation.enabled=true \
>>  --conf spark.shuffle.service.enabled=true \
>>  --conf spark.dynamicAllocation.minExecutors=1 \
>>  --conf spark.dynamicAllocation.maxExecutors=3 \
>>  --conf spark.dynamicAllocation.initialExecutors=3 \
>>  --conf spark.executor.instances=3 \
>>
>> If i run twice spark-submit it create 6 executors, but i want to run all
>> this jobs on same spark application.
>>
>> How can achieve adding jobs to an existing spark application?
>>
>> I don't understand why SparkContext.getOrCreate() don't get existing
>> spark context.
>>
>>
>> Thanks,
>>
>> Cosmin P.
>>
> --
> Best Regards,
> Ayan Guha
>


Re: About saving a model file

2017-02-07 Thread durgaswaroop
Did you find any solution? Are you using a Spark cluster?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/About-saving-a-model-file-tp25136p28369.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



No topicDistributions(..) method in ml.clustering.LocalLDAModel

2017-02-07 Thread sachintyagi22
Hi, 

I was using ml.clustering.LDA for topic modelling (with online optimizer)
and it returns ml.clustering.LocalLDAModel. However, using this model there
doesn't seem to be any way to get the topic distribution over documents.
While older mllib API (mllib.clustering.LocalLDAModel ) does have the method
for exactly that -- topicDistributions(..)

I am not sure why it so. Specially given that the new ml.LDA uses older
mllib.LDA and wraps the older mllib.LocalLDAModel in the new
ml.LocalLDAModel.

So, can someone please clarify:
1. Why this is so?
2. What is the correct way to get topic distributions in the new
LocalLDAModel?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-topicDistributions-method-in-ml-clustering-LocalLDAModel-tp28368.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



submit a spark code on google cloud

2017-02-07 Thread Anahita Talebi
Hello Friends,

I am trying to run a spark code on multiple machines. To this aim, I submit
a spark code on submit job on google cloud platform.
https://cloud.google.com/dataproc/docs/guides/submit-job

I have created a cluster with 6 nodes. Does anyone know how I can realize
which nodes are participated when I run the code on the cluster?

Thanks a lot,
Anahita


Fault tolerant broadcast in updateStateByKey

2017-02-07 Thread Amit Sela
Hi all,

I was wondering if anyone ever used a broadcast variable within
an updateStateByKey op. ? Using it is straight-forward but I was wondering
how it'll work after resuming from checkpoint (using the rdd.context()
trick is not possible here) ?

Thanks,
Amit


Re: [Spark Context]: How to add on demand jobs to an existing spark context?

2017-02-07 Thread ayan guha
I think you are loking for livy or spark  jobserver
On Wed, 8 Feb 2017 at 12:37 am, Cosmin Posteuca 
wrote:

> I want to run different jobs on demand with same spark context, but i
> don't know how exactly i can do this.
>
> I try to get current context, but seems it create a new spark context(with
> new executors).
>
> I call spark-submit to add new jobs.
>
> I run code on Amazon EMR(3 instances, 4 core & 16GB ram / instance), with
> yarn as resource manager.
>
> My code:
>
> val sparkContext = SparkContext.getOrCreate()
> val content = 1 to 4
> val result = sparkContext.parallelize(content, 5)
> result.map(value => value.toString).foreach(loop)
>
> def loop(x: String): Unit = {
>for (a <- 1 to 3000) {
>
>}
> }
>
> spark-submit:
>
> spark-submit --executor-cores 1 \
>  --executor-memory 1g \
>  --driver-memory 1g \
>  --master yarn \
>  --deploy-mode cluster \
>  --conf spark.dynamicAllocation.enabled=true \
>  --conf spark.shuffle.service.enabled=true \
>  --conf spark.dynamicAllocation.minExecutors=1 \
>  --conf spark.dynamicAllocation.maxExecutors=3 \
>  --conf spark.dynamicAllocation.initialExecutors=3 \
>  --conf spark.executor.instances=3 \
>
> If i run twice spark-submit it create 6 executors, but i want to run all
> this jobs on same spark application.
>
> How can achieve adding jobs to an existing spark application?
>
> I don't understand why SparkContext.getOrCreate() don't get existing
> spark context.
>
>
> Thanks,
>
> Cosmin P.
>
-- 
Best Regards,
Ayan Guha


[Spark Context]: How to add on demand jobs to an existing spark context?

2017-02-07 Thread Cosmin Posteuca
I want to run different jobs on demand with same spark context, but i don't
know how exactly i can do this.

I try to get current context, but seems it create a new spark context(with
new executors).

I call spark-submit to add new jobs.

I run code on Amazon EMR(3 instances, 4 core & 16GB ram / instance), with
yarn as resource manager.

My code:

val sparkContext = SparkContext.getOrCreate()
val content = 1 to 4
val result = sparkContext.parallelize(content, 5)
result.map(value => value.toString).foreach(loop)

def loop(x: String): Unit = {
   for (a <- 1 to 3000) {

   }
}

spark-submit:

spark-submit --executor-cores 1 \
 --executor-memory 1g \
 --driver-memory 1g \
 --master yarn \
 --deploy-mode cluster \
 --conf spark.dynamicAllocation.enabled=true \
 --conf spark.shuffle.service.enabled=true \
 --conf spark.dynamicAllocation.minExecutors=1 \
 --conf spark.dynamicAllocation.maxExecutors=3 \
 --conf spark.dynamicAllocation.initialExecutors=3 \
 --conf spark.executor.instances=3 \

If i run twice spark-submit it create 6 executors, but i want to run all
this jobs on same spark application.

How can achieve adding jobs to an existing spark application?

I don't understand why SparkContext.getOrCreate() don't get existing spark
context.


Thanks,

Cosmin P.


Re: Launching an Spark application in a subset of machines

2017-02-07 Thread Muhammad Asif Abbasi
YARN provides the concept of node labels. You should explore the
"spark.yarn.executor.nodeLabelConfiguration" property.


Cheers,
Asif Abbasi

On Tue, 7 Feb 2017 at 10:21, Alvaro Brandon  wrote:

> Hello all:
>
> I have the following scenario.
> - I have a cluster of 50 machines with Hadoop and Spark installed on them.
> - I want to launch one Spark application through spark submit. However I
> want this application to run on only a subset of these machines,
> disregarding data locality. (e.g. 10 machines)
>
> Is this possible?. Is there any option in the standalone scheduler, YARN
> or Mesos that allows such thing?.
>
>
>


Re: Launching an Spark application in a subset of machines

2017-02-07 Thread Alvaro Brandon
I want to scale up or down the number of machines used, depending on the
SLA of a job. For example if I have a low priority job I will give it 10
machines, while a high priority will be given 50. Also I want to choose
subsets depending on the hardware. For example "Launch this job only on
machines with GPU's".

Looking into Mesos attributes this seems the perfect fit for it. Is that
correct?

2017-02-07 12:27 GMT+01:00 Jörn Franke :

> If you want to run them always on the same machines use yarn node labels.
> If it is any 10 machines then use capacity or fair scheduler.
>
> What is the use case for running it always on the same 10 machines. If it
> is for licensing reasons then I would ask your vendor if this is a suitable
> mean to ensure license compliance. Otherwise dedicated cluster.
>
> On 7 Feb 2017, at 12:09, Alvaro Brandon  wrote:
>
> Hello Pavel:
>
> Thanks for the pointers.
>
> For standalone cluster manager: I understand that I just have to start
> several masters with a subset of slaves attached. Then each master will
> listen on a different pair of , allowing me to spark-submit
> to any of these pairs depending on the subset of machines I want to use.
>
> For Mesos: I haven't used Mesos much. Any references or documentation I
> can use to set this up?
>
> Best Regards
>
>
>
> 2017-02-07 11:36 GMT+01:00 Pavel Plotnikov  >:
>
>> Hi, Alvaro
>> You can create different clusters using standalone cluster manager, and
>> than manage subset of machines through submitting application on different
>> masters. Or you can use Mesos attributes to mark subset of workers and
>> specify it in spark.mesos.constraints
>>
>>
>> On Tue, Feb 7, 2017 at 1:21 PM Alvaro Brandon 
>> wrote:
>>
>>> Hello all:
>>>
>>> I have the following scenario.
>>> - I have a cluster of 50 machines with Hadoop and Spark installed on
>>> them.
>>> - I want to launch one Spark application through spark submit. However I
>>> want this application to run on only a subset of these machines,
>>> disregarding data locality. (e.g. 10 machines)
>>>
>>> Is this possible?. Is there any option in the standalone scheduler, YARN
>>> or Mesos that allows such thing?.
>>>
>>>
>>>
>


Re: Launching an Spark application in a subset of machines

2017-02-07 Thread Jörn Franke
If you want to run them always on the same machines use yarn node labels. If it 
is any 10 machines then use capacity or fair scheduler.

What is the use case for running it always on the same 10 machines. If it is 
for licensing reasons then I would ask your vendor if this is a suitable mean 
to ensure license compliance. Otherwise dedicated cluster.

> On 7 Feb 2017, at 12:09, Alvaro Brandon  wrote:
> 
> Hello Pavel:
> 
> Thanks for the pointers. 
> 
> For standalone cluster manager: I understand that I just have to start 
> several masters with a subset of slaves attached. Then each master will 
> listen on a different pair of , allowing me to spark-submit to 
> any of these pairs depending on the subset of machines I want to use.
> 
> For Mesos: I haven't used Mesos much. Any references or documentation I can 
> use to set this up?
> 
> Best Regards
> 
> 
> 
> 2017-02-07 11:36 GMT+01:00 Pavel Plotnikov :
>> Hi, Alvaro
>> You can create different clusters using standalone cluster manager, and than 
>> manage subset of machines through submitting application on different 
>> masters. Or you can use Mesos attributes to mark subset of workers and 
>> specify it in spark.mesos.constraints
>> 
>> 
>>> On Tue, Feb 7, 2017 at 1:21 PM Alvaro Brandon  
>>> wrote:
>>> Hello all:
>>> 
>>> I have the following scenario. 
>>> - I have a cluster of 50 machines with Hadoop and Spark installed on them. 
>>> - I want to launch one Spark application through spark submit. However I 
>>> want this application to run on only a subset of these machines, 
>>> disregarding data locality. (e.g. 10 machines)
>>> 
>>> Is this possible?. Is there any option in the standalone scheduler, YARN or 
>>> Mesos that allows such thing?.
>>> 
>>> 
> 


Re: Launching an Spark application in a subset of machines

2017-02-07 Thread Alvaro Brandon
Hello Pavel:

Thanks for the pointers.

For standalone cluster manager: I understand that I just have to start
several masters with a subset of slaves attached. Then each master will
listen on a different pair of , allowing me to spark-submit
to any of these pairs depending on the subset of machines I want to use.

For Mesos: I haven't used Mesos much. Any references or documentation I can
use to set this up?

Best Regards



2017-02-07 11:36 GMT+01:00 Pavel Plotnikov :

> Hi, Alvaro
> You can create different clusters using standalone cluster manager, and
> than manage subset of machines through submitting application on different
> masters. Or you can use Mesos attributes to mark subset of workers and
> specify it in spark.mesos.constraints
>
>
> On Tue, Feb 7, 2017 at 1:21 PM Alvaro Brandon 
> wrote:
>
>> Hello all:
>>
>> I have the following scenario.
>> - I have a cluster of 50 machines with Hadoop and Spark installed on
>> them.
>> - I want to launch one Spark application through spark submit. However I
>> want this application to run on only a subset of these machines,
>> disregarding data locality. (e.g. 10 machines)
>>
>> Is this possible?. Is there any option in the standalone scheduler, YARN
>> or Mesos that allows such thing?.
>>
>>
>>


Re: Launching an Spark application in a subset of machines

2017-02-07 Thread Pavel Plotnikov
Hi, Alvaro
You can create different clusters using standalone cluster manager, and
than manage subset of machines through submitting application on different
masters. Or you can use Mesos attributes to mark subset of workers and
specify it in spark.mesos.constraints


On Tue, Feb 7, 2017 at 1:21 PM Alvaro Brandon 
wrote:

> Hello all:
>
> I have the following scenario.
> - I have a cluster of 50 machines with Hadoop and Spark installed on them.
> - I want to launch one Spark application through spark submit. However I
> want this application to run on only a subset of these machines,
> disregarding data locality. (e.g. 10 machines)
>
> Is this possible?. Is there any option in the standalone scheduler, YARN
> or Mesos that allows such thing?.
>
>
>


Launching an Spark application in a subset of machines

2017-02-07 Thread Alvaro Brandon
Hello all:

I have the following scenario.
- I have a cluster of 50 machines with Hadoop and Spark installed on them.
- I want to launch one Spark application through spark submit. However I
want this application to run on only a subset of these machines,
disregarding data locality. (e.g. 10 machines)

Is this possible?. Is there any option in the standalone scheduler, YARN or
Mesos that allows such thing?.


Re: spark architecture question -- Pleas Read

2017-02-07 Thread Alex
Hi All,

So Will be there any performance difference instead of running hive java
native udfs in spark-shell using hive context if we recode the entire logic
to spark-sql code?

or spark is anyway converting hiev java udf to spark sql code so we dont
need to rewrite the entire logic in spark-sql?

On Mon, Feb 6, 2017 at 2:40 AM, kuassi mensah 
wrote:

> Apology in advance for injecting Oracle product in this discussion but I
> thought it might help address the requirements (as far as I understood
> these).
> We are looking into furnishing for Spark a new connector similar to the
> Oracle Datasource for Hadoop,
>
> which
> will implement the Spark DataSource interfaces for Oracle Database.
>
> In summary, it'll allow:
>
>- allow parallel and direct access to the Oracle database (with option
>to control the number of concurrent connections)
>- introspect the Oracle table then dynamically generate partitions of
>Spark JDBCRDDs based on the split pattern and rewrite Spark SQL queries
>into Oracle SQL queries for each partition. The typical use case consists
>in joining fact data (or Big Data) with master data in Oracle.
>- hooks in Oracle JDBC driver for faster type conversions
>- Implement predicate pushdown, partition pruning, column projections
>to the Oracle database, thereby reducing the amount of data to be processed
>on Spark
>- write back to Oracle table (through paralllel insert) the result of
>SparkSQL processing for further mining by traditional BI tools.
>
> You may reach out to me offline for ore details if interested,
>
> Kuassi
>
>
> On 1/29/2017 3:39 AM, Mich Talebzadeh wrote:
>
> This is classis nothing special about it.
>
>
>1. You source is Oracle schema tables
>2. You can use Oracle JDBC connection with DIRECT CONNECT and parallel
>processing to read your data from Oracle table into Spark FP using JDBC.
>Ensure that you are getting data from Oracle DB at a time when the DB is
>not busy and network between your Spark and Oracle is reasonable. You will
>be creating multiple connections to your Oracle database from Spark
>3. Create a DF from RDD and ingest your data into Hive staging tables.
>This should be pretty fast. If you are using a recent version of Spark >
>1.5 you can see this in Spark GUI
>4. Once data is ingested into Hive table (frequency Discrete,
>Recurring or Cumulative), then you have your source data in Hive
>5. Do your work in Hive staging tables and then your enriched data
>will go into Hive enriched tables (different from your staging tables). You
>can use Spark to enrich (transform) your data on Hive staging tables
>6. Then use Spark to send that data into Oracle table. Again bear in
>mind that the application has to handle consistency from Big Data into
>RDBMS. For example what you are going to do with failed transactions in
>Oracle
>7. From my experience you also need some  staging tables in Oracle to
>handle inserts from Hive via Spark into Oracle table
>8. Finally run a job in PL/SQL to load Oracle target tables from
>Oracle staging tables
>
> Notes:
>
> Oracle columns types are 100% compatible with Spark. For example Spark
> does not recognize CHAR column and that has to be converted into VARCHAR or
> STRING.
> Hive does not have the concept of Oracle "WITH CLAUSE" inline table. So
> that script that works in Oracle may not work in Hive. Windowing functions
> should be fine.
>
> I tend to do all this via shell script that gives control at each layer
> and creates alarms.
>
> HTH
>
>
>
>1.
>2.
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 29 January 2017 at 10:18, Alex  wrote:
>
>> Hi All,
>>
>> Thanks for your response .. Please find below flow diagram
>>
>> Please help me out simplifying this architecture using Spark
>>
>> 1) Can i skip step 1 to step 4 and directly store it in spark
>> if I am storing it in spark where actually it is getting stored
>> Do i need to retain HAdoop to store data
>> or can i directly store it in spark and remove hadoop also?
>>
>> I want to remove informatica for preprocessing and directly load the
>> files data coming from server to Hadoop/Spark
>>
>> So My Question is Can i directly