Re: A bug in spark or hadoop RPC with kerberos authentication?

2017-08-22 Thread 周康
you can checkout Hadoop**credential class in  spark yarn。During spark
submit,it will use config on the classpath.
I wonder how do you reference your own config?


Spark Authentication on Mesos Client Mode

2017-08-22 Thread Kalvin Chau
Hi,

Is it possible to use spark.authenticate with shared secrets in mesos
client mode? I can get the driver to start up and expect to use
authenticated channels, but when the executors start up the SecurityManager
outputs "authentication: disabled".

Looking through the CoarseGrainedSchelduerBackend code, it just creates a
new SparkEnv (and doesn't look like it loads spark-defaults.conf) and
attempts to fetch properties from the driver.
But the driver is expecting an authentication challenge and bails when it
realizes the executor is not attempting the challenge.

Gist of some of the logs
https://gist.github.com/kalvinnchau/53c60c91a0e1b47ad0afd9fe06d81101

Relevant code here, I don't see where the executor would load up auth
settings and grab the shared secret.

  // Bootstrap to fetch the driver's Spark properties.
  val executorConf = new SparkConf
  val port = executorConf.getInt("spark.executor.port", 0)
  val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
port,
executorConf,
new SecurityManager(executorConf),
clientMode = true)
  val driver = fetcher.setupEndpointRefByURI(driverUrl)
  val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
  val props = cfg.sparkProperties ++ Seq[(String, String)](("
spark.app.id", appId))
  fetcher.shutdown()


Is there some setting I'm missing to get the executor backend to pull in
authentication settings?

Thanks,
Kalvin


Re: Spark submit OutOfMemory Error in local mode

2017-08-22 Thread Naga G
Increase the cores, as you're trying to run multiple threads

Sent from Naga iPad

> On Aug 22, 2017, at 3:26 PM, "u...@moosheimer.com"  
> wrote:
> 
> Since you didn't post any concrete information it's hard to give you an 
> advice.
> 
> Try to increase the executor memory (spark.executor.memory).
> If that doesn't help give all the experts in the community a chance to help 
> you by adding more details like version, logfile, source etc
> 
> Mit freundlichen Grüßen / best regards
> Kay-Uwe Moosheimer
> 
>> Am 22.08.2017 um 20:16 schrieb shitijkuls :
>> 
>> Any help here will be appreciated.
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-OutOfMemory-Error-in-local-mode-tp29081p29096.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Streaming] Streaming Dynamic Allocation is broken (at least on YARN)

2017-08-22 Thread Karthik Palaniappan
I ran the HdfsWordCount example using this command:

spark-submit run-example \
  --conf spark.streaming.dynamicAllocation.enabled=true \
  --conf spark.executor.instances=0 \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.master=yarn \
  --conf spark.submit.deployMode=client \
  org.apache.spark.examples.streaming.HdfsWordCount /foo

I tried it on both Spark 2.1.1 (through HDP 2.6) and Spark 2.2.0 (through 
Google Dataproc 1.2), and I get the same message repeatedly that Spark cannot 
allocate any executors.

17/08/22 19:34:57 INFO org.spark_project.jetty.util.log: Logging initialized 
@1694ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: 
jetty-9.3.z-SNAPSHOT
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: Started @1756ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.AbstractConnector: 
Started ServerConnector@578782d6{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
17/08/22 19:34:58 INFO 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 
1.6.1-hadoop2
17/08/22 19:34:58 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to 
ResourceManager at hadoop-m/10.240.1.92:8032
17/08/22 19:35:00 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: 
Submitted application application_1503036971561_0022
17/08/22 19:35:04 WARN org.apache.spark.streaming.StreamingContext: Dynamic 
Allocation is enabled for this application. Enabling Dynamic allocation for 
Spark Streaming applications can cause data loss if Write Ahead Log is not 
enabled for non-replayable sources like Flume. See the programming guide for 
details on how to enable the Write Ahead Log.
17/08/22 19:35:21 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:36 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:51 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources

I confirmed that the YARN cluster has enough memory for dozens of executors, 
and verified that the application allocates executors when using Core's 
spark.dynamicAllocation.enabled=true, and leaving 
spark.streaming.dynamicAllocation.enabled=false.

Is streaming dynamic allocation actually supported? Sean Owen suggested it 
might have been experimental: https://issues.apache.org/jira/browse/SPARK-21792.


Re: Spark submit OutOfMemory Error in local mode

2017-08-22 Thread u...@moosheimer.com
Since you didn't post any concrete information it's hard to give you an advice.

Try to increase the executor memory (spark.executor.memory).
If that doesn't help give all the experts in the community a chance to help you 
by adding more details like version, logfile, source etc

Mit freundlichen Grüßen / best regards
Kay-Uwe Moosheimer

> Am 22.08.2017 um 20:16 schrieb shitijkuls :
> 
> Any help here will be appreciated.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-OutOfMemory-Error-in-local-mode-tp29081p29096.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Streaming][Structured Streaming] Understanding dynamic allocation in streaming jobs

2017-08-22 Thread Karthik Palaniappan
I'm trying to understand dynamic allocation in Spark Streaming and Structured 
Streaming. It seems if you set spark.dynamicAllocation.enabled=true, both 
frameworks use Core's dynamic allocation algorithm -- request executors if the 
task backlog is a certain size, and remove executors if they idle for a certain 
period of time.


However, as this Cloudera post points out, that algorithm doesn't really make 
sense for streaming: 
https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_streaming.html.
 When writing a toy streaming job, I did run into the issue where executors are 
never removed. Cloudera's suggestion of turning off dynamic allocation seems 
unreasonable -- Spark applications should grow/shrink to match their workload.


I see that Spark Streaming has its own (undocumented) configuration for dynamic 
allocation: https://issues.apache.org/jira/browse/SPARK-12133. Is that actually 
a supported feature? Or was that just an experiment? I had trouble getting this 
to work, but I'll follow up in a different thread.


Also, does Structured Streaming have its own dynamic allocation algorithm?


Thanks,

Karthik Palaniappan



What is the right way to stop a streaming application?

2017-08-22 Thread shyla deshpande
Hi all,

I am running a spark streaming application on AWS EC2 cluster in standalone
mode. I am using DStreams and Spark 2.0.2

I do have the setting stopGracefullyOnShutdown to true.  What is the right
way to stop the streaming application.

Thanks


Re: Spark submit OutOfMemory Error in local mode

2017-08-22 Thread shitijkuls
Any help here will be appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-OutOfMemory-Error-in-local-mode-tp29081p29096.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Update MySQL table via Spark/SparkR?

2017-08-22 Thread Pierce Lamb
Hi Jake,

There is a another option within the 3rd party projects in the spark
database ecosystem that have combined Spark with a DBMS in such a way that
DataFrame API has been extended to include UPDATE operations
.
However, in your case you would have to move away from MySQL in order to
use this API.

Best,

Pierce

On Tue, Aug 22, 2017 at 7:54 AM, Jake Russ 
wrote:

> Hi Mich,
>
>
>
> Thank you for the explanation, that makes sense, and is helpful for me to
> understand the bigger picture between Spark/RDBMS.
>
>
>
> Happy to know I’m already following best practice.
>
>
>
> Cheers,
>
>
>
> Jake
>
>
>
> *From: *Mich Talebzadeh 
> *Date: *Monday, August 21, 2017 at 6:44 PM
> *To: *Jake Russ 
> *Cc: *"user@spark.apache.org" 
> *Subject: *Re: Update MySQL table via Spark/SparkR?
>
>
>
> Hi Jake,
>
> This is an issue across all RDBMs including Oracle etc. When you are
> updating you have to commit or roll back in RDBMS itself and I am not aware
> of Spark doing that.
>
> The staging table is a safer method as it follows ETL type approach. You
> create new data in the staging table in RDBMS and do the DML in the RDBMS
> itself where you can control commit or rollback. That is the way I would do
> it. A simple shell script can do both.
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
> On 21 August 2017 at 15:50, Jake Russ  wrote:
>
> Hi everyone,
>
>
>
> I’m currently using SparkR to read data from a MySQL database, perform
> some calculations, and then write the results back to MySQL. Is it still
> true that Spark does not support UPDATE queries via JDBC? I’ve seen many
> posts on the internet that Spark’s DataFrameWriter does not support
> UPDATE queries via JDBC
> . It will only
> “append” or “overwrite” to existing tables. The best advice I’ve found so
> far, for performing this update, is to write to a staging table in MySQL
> 
>  and
> then perform the UPDATE query on the MySQL side.
>
>
>
> Ideally, I’d like to handle the update during the write operation. Has
> anyone else encountered this limitation and have a better solution?
>
>
>
> Thank you,
>
>
>
> Jake
>
>
>


A bug in spark or hadoop RPC with kerberos authentication?

2017-08-22 Thread Sun, Keith
Hello ,

I met this very weird issue, while easy to reproduce, and stuck me for more 
than 1 day .I suspect this may be an issue/bug related to the class loader.
Can you help confirm the root cause ?

I want to specify a customized Hadoop configuration set instead of those on the 
class path(we have a few hadoop clusters and all have Kerberos security and I 
want to support different configuration).
Code/error like below.


The work around I found is to place a core-site.xml on the class path with 
below 2 properties will work.
By checking  the rpc code under org.apache.hadoop.ipc.RPC, I suspect the RPC 
code may not see the UGI class in the same classloader.
So UGI is initialized with default value on the classpth which is simple 
authentication.

core-site.xml with the security setup on the classpath:


hadoop.security.authentication
 kerberos


hadoop.security.authorization
true




error--
2673 [main] DEBUG 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil  - 
DataTransferProtocol using SaslPropertiesResolver, configured QOP 
dfs.data.transfer.protection = privacy, configured class 
dfs.data.transfer.saslproperties.resolver.class = class 
org.apache.hadoop.security.WhitelistBasedResolver
2696 [main] DEBUG org.apache.hadoop.service.AbstractService  - Service: 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state INITED
2744 [main] DEBUG org.apache.hadoop.security.UserGroupInformation  - 
PrivilegedAction as:x@xxxCOM (auth:KERBEROS) 
from:org.apache.hadoop.yarn.client.RMProxy.getProxy(RMProxy.java:136) //
2746 [main] DEBUG org.apache.hadoop.yarn.ipc.YarnRPC  - Creating YarnRPC for 
org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
2746 [main] DEBUG org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC  - Creating a 
HadoopYarnProtoRpc proxy for protocol interface 
org.apache.hadoop.yarn.api.ApplicationClientProtocol
2801 [main] DEBUG org.apache.hadoop.ipc.Client  - getting client out of cache: 
org.apache.hadoop.ipc.Client@748fe51d
2981 [main] DEBUG org.apache.hadoop.service.AbstractService  - Service 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl is started
3004 [main] DEBUG org.apache.hadoop.ipc.Client  - The ping interval is 6 ms.
3005 [main] DEBUG org.apache.hadoop.ipc.Client  - Connecting to 
yarn-rm-1/x:8032
3019 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from 
x@xx] DEBUG org.apache.hadoop.ipc.Client  - IPC Client (2012095985) 
connection to yarn-rm-1/x:8032 from x@xx: starting, having 
connections 1
3020 [IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client  - 
IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx 
sending #0
3025 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from 
x@xx] DEBUG org.apache.hadoop.ipc.Client  - IPC Client (2012095985) 
connection to yarn-rm-1/x:8032 from x@xx got value #-1
3026 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from 
x@xx] DEBUG org.apache.hadoop.ipc.Client  - closing ipc connection to 
yarn-rm-1/x:8032: SIMPLE authentication is not enabled.  Available:[TOKEN, 
KERBEROS]
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
 SIMPLE authentication is not enabled.  Available:[TOKEN, KERBEROS]
at 
org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1131)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979)
---code---

  Configuration hc = new  Configuration(false);

  hc.addResource("myconf /yarn-site.xml");
  hc.addResource("myconf/core-site.xml");
  hc.addResource("myconf/hdfs-site.xml");
  hc.addResource("myconf/hive-site.xml");

  SparkConf sc = new SparkConf(true);
  // add config in spark conf as no xml in the classpath except those 
“default.xml” from Hadoop jars.
  hc.forEach(entry-> {
if(entry.getKey().startsWith("hive")) {
sc.set(entry.getKey(), entry.getValue());
}else {
sc.set("spark.hadoop."+entry.getKey(), entry.getValue());
}
 });

   UserGroupInformation.setConfiguration(hc);
   UserGroupInformation.loginUserFromKeytab(Principal, Keytab);

  System.out.println("spark-conf##");
  System.out.println(sc.toDebugString());


  SparkSession sparkSessesion= SparkSession
.builder()
.master("yarn-client") //"yarn-client", "local"
.config(sc)
.appName(SparkEAZDebug.class.getName())
.enableHiveSupport()
.getOrCreate();

Thanks very much.
Keith



Re: Update MySQL table via Spark/SparkR?

2017-08-22 Thread Jake Russ
Hi Mich,

Thank you for the explanation, that makes sense, and is helpful for me to 
understand the bigger picture between Spark/RDBMS.

Happy to know I’m already following best practice.

Cheers,

Jake

From: Mich Talebzadeh 
Date: Monday, August 21, 2017 at 6:44 PM
To: Jake Russ 
Cc: "user@spark.apache.org" 
Subject: Re: Update MySQL table via Spark/SparkR?

Hi Jake,
This is an issue across all RDBMs including Oracle etc. When you are updating 
you have to commit or roll back in RDBMS itself and I am not aware of Spark 
doing that.
The staging table is a safer method as it follows ETL type approach. You create 
new data in the staging table in RDBMS and do the DML in the RDBMS itself where 
you can control commit or rollback. That is the way I would do it. A simple 
shell script can do both.
HTH



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 21 August 2017 at 15:50, Jake Russ 
> wrote:
Hi everyone,

I’m currently using SparkR to read data from a MySQL database, perform some 
calculations, and then write the results back to MySQL. Is it still true that 
Spark does not support UPDATE queries via JDBC? I’ve seen many posts on the 
internet that Spark’s DataFrameWriter does not support UPDATE queries via 
JDBC. It will only “append” 
or “overwrite” to existing tables. The best advice I’ve found so far, for 
performing this update, is to write to a staging table in 
MySQL
 and then perform the UPDATE query on the MySQL side.

Ideally, I’d like to handle the update during the write operation. Has anyone 
else encountered this limitation and have a better solution?

Thank you,

Jake



Re: How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10?

2017-08-22 Thread Cody Koeninger
Kafka rdds need to start from a specified offset, you really don't
want the executors just starting at whatever offset happened to be
latest at the time they ran.

If you need a way to figure out the latest offset at the time the
driver starts up, you can always use a consumer to read the offsets
and then pass that to Assign (just make sure that consumer is closed
before the job starts so you don't get group id conflicts).  You can
even make your own implementation of ConsumerStrategy, which should
allow you to do pretty much whatever you need to get the consumer in
the state you want.

On Mon, Aug 21, 2017 at 6:57 PM, swetha kasireddy
 wrote:
> Hi Cody,
>
> I think the Assign is used if we want it to start from a specified offset.
> What if we want it to start it from the latest offset with something like
> returned by "auto.offset.reset" -> "latest",.
>
>
> Thanks!
>
> On Mon, Aug 21, 2017 at 9:06 AM, Cody Koeninger  wrote:
>>
>> Yes, you can start from specified offsets.  See ConsumerStrategy,
>> specifically Assign
>>
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#your-own-data-store
>>
>> On Tue, Aug 15, 2017 at 1:18 PM, SRK  wrote:
>> > Hi,
>> >
>> > How to force Spark Kafka Direct to start from the latest offset when the
>> > lag
>> > is huge in kafka 10? It seems to be processing from the latest offset
>> > stored
>> > for a group id. One way to do this is to change the group id. But it
>> > would
>> > mean that each time that we need to process the job from the latest
>> > offset
>> > we have to provide a new group id.
>> >
>> > Is there a way to force the job to run from the latest offset in case we
>> > need to and still use the same group id?
>> >
>> > Thanks!
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-force-Spark-Kafka-Direct-to-start-from-the-latest-offset-when-the-lag-is-huge-in-kafka-10-tp29071.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: ORC Transaction Table - Spark

2017-08-22 Thread Aviral Agarwal
Hi,

I am trying to read hive orc transaction table through Spark but I am
getting the following error

Caused by: java.lang.RuntimeException: serious problem
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(
OrcInputFormat.java:1021)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(
OrcInputFormat.java:1048)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
.
Caused by: java.util.concurrent.ExecutionException:
java.lang.NumberFormatException:
For input string: "0645253_0001"
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(
OrcInputFormat.java:998)
... 118 more

Any help would be appreciated.

Thanks and Regards,
Aviral Agarwal


Re: UI for spark machine learning.

2017-08-22 Thread Sea aj
Jorn,

My question is not about the model type but instead, the spark capability
on reusing any already trained ml model in training a new model.




On Tue, Aug 22, 2017 at 1:13 PM, Jörn Franke  wrote:

> Is it really required to have one billion samples for just linear
> regression? Probably your model would do equally well with much less
> samples. Have you checked bias and variance if you use much less random
> samples?
>
> On 22. Aug 2017, at 12:58, Sea aj  wrote:
>
> I have a large dataframe of 1 billion rows of type LabeledPoint. I tried
> to train a linear regression model on the df but it failed due to lack of
> memory although I'm using 9 slaves, each with 100gb of ram and 16 cores of
> CPU.
>
> I decided to split my data into multiple chunks and train the model in
> multiple phases but I learned the linear regression model in ml library
> does not have "setinitialmodel" function to be able to pass the trained
> model from one chunk to the rest of chunks. In another word, each time I
> call the fit function over a chunk of my data, it overwrites the previous
> mode.
>
> So far the only solution I found is using Spark Streaming to be able to
> split the data to multiple dfs and then train over each individually to
> overcome memory issue.
>
> Do you know if there's any other solution?
>
>
>
>
> On Mon, Jul 10, 2017 at 7:57 AM, Jayant Shekhar 
> wrote:
>
>> Hello Mahesh,
>>
>> We have built one. You can download from here :
>> https://www.sparkflows.io/download
>>
>> Feel free to ping me for any questions, etc.
>>
>> Best Regards,
>> Jayant
>>
>>
>> On Sun, Jul 9, 2017 at 9:35 PM, Mahesh Sawaiker <
>> mahesh_sawai...@persistent.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> 1) Is anyone aware of any workbench kind of tool to run ML jobs in
>>> spark. Specifically is the tool  could be something like a Web application
>>> that is configured to connect to a spark cluster.
>>>
>>>
>>> User is able to select input training sets probably from hdfs , train
>>> and then run predictions, without having to write any Scala code.
>>>
>>>
>>> 2) If there is not tool, is there value in having such tool, what could
>>> be the challenges.
>>>
>>>
>>> Thanks,
>>>
>>> Mahesh
>>>
>>>
>>> DISCLAIMER
>>> ==
>>> This e-mail may contain privileged and confidential information which is
>>> the property of Persistent Systems Ltd. It is intended only for the use of
>>> the individual or entity to which it is addressed. If you are not the
>>> intended recipient, you are not authorized to read, retain, copy, print,
>>> distribute or use this message. If you have received this communication in
>>> error, please notify the sender and delete all copies of this message.
>>> Persistent Systems Ltd. does not accept any liability for virus infected
>>> mails.
>>>
>>
>>
>


Re: UI for spark machine learning.

2017-08-22 Thread Jörn Franke
Is it really required to have one billion samples for just linear regression? 
Probably your model would do equally well with much less samples. Have you 
checked bias and variance if you use much less random samples?

> On 22. Aug 2017, at 12:58, Sea aj  wrote:
> 
> I have a large dataframe of 1 billion rows of type LabeledPoint. I tried to 
> train a linear regression model on the df but it failed due to lack of memory 
> although I'm using 9 slaves, each with 100gb of ram and 16 cores of CPU.
> 
> I decided to split my data into multiple chunks and train the model in 
> multiple phases but I learned the linear regression model in ml library does 
> not have "setinitialmodel" function to be able to pass the trained model from 
> one chunk to the rest of chunks. In another word, each time I call the fit 
> function over a chunk of my data, it overwrites the previous mode.
> 
> So far the only solution I found is using Spark Streaming to be able to split 
> the data to multiple dfs and then train over each individually to overcome 
> memory issue.
> 
> Do you know if there's any other solution?
> 
> 
> 
> 
>> On Mon, Jul 10, 2017 at 7:57 AM, Jayant Shekhar  
>> wrote:
>> Hello Mahesh,
>> 
>> We have built one. You can download from here : 
>> https://www.sparkflows.io/download
>> 
>> Feel free to ping me for any questions, etc.
>> 
>> Best Regards,
>> Jayant
>> 
>> 
>>> On Sun, Jul 9, 2017 at 9:35 PM, Mahesh Sawaiker 
>>>  wrote:
>>> Hi,
>>> 
>>> 
>>> 1) Is anyone aware of any workbench kind of tool to run ML jobs in spark. 
>>> Specifically is the tool  could be something like a Web application that is 
>>> configured to connect to a spark cluster.
>>> 
>>> 
>>> User is able to select input training sets probably from hdfs , train and 
>>> then run predictions, without having to write any Scala code. 
>>> 
>>> 2) If there is not tool, is there value in having such tool, what could be 
>>> the challenges.
>>> 
>>> 
>>> Thanks,
>>> 
>>> Mahesh
>>> 
>>> DISCLAIMER
>>> ==
>>> This e-mail may contain privileged and confidential information which is 
>>> the property of Persistent Systems Ltd. It is intended only for the use of 
>>> the individual or entity to which it is addressed. If you are not the 
>>> intended recipient, you are not authorized to read, retain, copy, print, 
>>> distribute or use this message. If you have received this communication in 
>>> error, please notify the sender and delete all copies of this message. 
>>> Persistent Systems Ltd. does not accept any liability for virus infected 
>>> mails.
>> 
> 


Re: UI for spark machine learning.

2017-08-22 Thread Sea aj
I have a large dataframe of 1 billion rows of type LabeledPoint. I tried to
train a linear regression model on the df but it failed due to lack of
memory although I'm using 9 slaves, each with 100gb of ram and 16 cores of
CPU.

I decided to split my data into multiple chunks and train the model in
multiple phases but I learned the linear regression model in ml library
does not have "setinitialmodel" function to be able to pass the trained
model from one chunk to the rest of chunks. In another word, each time I
call the fit function over a chunk of my data, it overwrites the previous
mode.

So far the only solution I found is using Spark Streaming to be able to
split the data to multiple dfs and then train over each individually to
overcome memory issue.

Do you know if there's any other solution?




On Mon, Jul 10, 2017 at 7:57 AM, Jayant Shekhar 
wrote:

> Hello Mahesh,
>
> We have built one. You can download from here : https://www.sparkflows.io/
> download
>
> Feel free to ping me for any questions, etc.
>
> Best Regards,
> Jayant
>
>
> On Sun, Jul 9, 2017 at 9:35 PM, Mahesh Sawaiker <
> mahesh_sawai...@persistent.com> wrote:
>
>> Hi,
>>
>>
>> 1) Is anyone aware of any workbench kind of tool to run ML jobs in spark.
>> Specifically is the tool  could be something like a Web application that is
>> configured to connect to a spark cluster.
>>
>>
>> User is able to select input training sets probably from hdfs , train and
>> then run predictions, without having to write any Scala code.
>>
>>
>> 2) If there is not tool, is there value in having such tool, what could
>> be the challenges.
>>
>>
>> Thanks,
>>
>> Mahesh
>>
>>
>> DISCLAIMER
>> ==
>> This e-mail may contain privileged and confidential information which is
>> the property of Persistent Systems Ltd. It is intended only for the use of
>> the individual or entity to which it is addressed. If you are not the
>> intended recipient, you are not authorized to read, retain, copy, print,
>> distribute or use this message. If you have received this communication in
>> error, please notify the sender and delete all copies of this message.
>> Persistent Systems Ltd. does not accept any liability for virus infected
>> mails.
>>
>
>


Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-22 Thread Jacek Laskowski
Hi,

Thanks a lot Burak for the explanation! I appreciate it a lot (and
promise to share the great news far and wide once I get the gist of
the internals myself)

What I miss is what part of Structured Streaming is responsible for
enforcing the semantics of output modes.

Once defined for a streaming query, output mode is used to create an
instance of StreamExecution (of the streaming query) that is only to
pass it along while creating IncrementalExecution. That
IncrementalExecution uses it only to fill out the missing pieces of
StateStoreSaveExec physical operator in a physical plan (of the
streaming query). What happens afterwards? What part of SSS uses the
metadata stored in the state checkpoint directory?

I could find UnsupportedOperationChecker that enforces that Complete
is used only for aggregations (groupBy and groupByKey) among other
checks. Is UnsupportedOperationChecker the enforcer with "others"
checking the mode at will?

I thought that output mode could be a feature of sinks and they would
know how to deal with the data. Or is this a feature of
IncrementalExecution (that is responsible for query execution)? Doh,
feel a bit overwhelmed with all the goodies and would appreciate some
more help.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sat, Aug 19, 2017 at 12:10 AM, Burak Yavuz  wrote:
> Hi Jacek,
>
> The way the memory sink is architected at the moment is that it either
> appends a row (append/update mode) or replaces all rows (complete mode).
> When a user specifies a checkpoint location, the guarantee Structured
> Streaming provides is that output sinks will not lose data and will be able
> to serve the results according to the specified output mode.
>
> Now, what makes Complete mode so special for the memory sink? With
> aggregations, and complete mode, all the results are provided from the
> StateStores, therefore we can accept a checkpoint location (where the
> StateStores save the data), and we can recreate the memory table at each
> trigger.
>
> Why doesn't append mode work for a Memory Sink? The memory sink keeps data
> from previous triggers in-memory. It doesn't persist it anywhere. If you
> were to query the table after restarting your stream, all the data would've
> been lost, and in order to retrieve the existing state of the memory table,
> you would need to process all the data that from scratch.
>
> Does all this make sense? Happy to elaborate.
>
> Best,
> Burak
>
>
>
>
>
> On Fri, Aug 18, 2017 at 12:52 PM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> This is what I could find in Spark's source code about the
>> `recoverFromCheckpointLocation` flag (that led me to explore the
>> complete output mode for dropDuplicates operator).
>>
>> `recoverFromCheckpointLocation` flag is enabled by default and varies
>> per sink (memory, console and others).
>>
>> * `memory` sink has the flag enabled for Complete output mode only
>>
>> * `foreach` sink has the flag always enabled
>>
>> * `console` sink has the flag always disabled
>>
>> * all other sinks have the flag always enabled
>>
>> As agreed with Michael
>> (https://issues.apache.org/jira/browse/SPARK-21667) is to make console
>> sink accepting the flag as enabled which would make memory sink the
>> only one left with the flag enabled for Complete output.
>>
>> And I thought I've been close to understand Structured Streaming :)
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Fri, Aug 18, 2017 at 9:21 PM, Holden Karau 
>> wrote:
>> > My assumption is it would be similar though, in memory sink of all of
>> > your
>> > records would quickly overwhelm your cluster, but in aggregation it
>> > could be
>> > reasonable. But there might be additional reasons on top of that.
>> >
>> > On Fri, Aug 18, 2017 at 11:44 AM Holden Karau 
>> > wrote:
>> >>
>> >> Ah yes I'm not sure about the workings of the memory sink.
>> >>
>> >> On Fri, Aug 18, 2017 at 11:36 AM, Jacek Laskowski 
>> >> wrote:
>> >>>
>> >>> Hi Holden,
>> >>>
>> >>> Thanks a lot for a bit more light on the topic. That however does not
>> >>> explain why memory sink requires Complete for a checkpoint location to
>> >>> work. The only reason I used Complete output mode was to meet the
>> >>> requirements of memory sink and that got me thinking why would the
>> >>> already-memory-hungry memory sink require yet another thing to get the
>> >>> query working.
>> >>>
>> >>> On to exploring the bits...
>> >>>
>> >>> Pozdrawiam,
>> >>> Jacek Laskowski
>> >>> 
>> >>> https://medium.com/@jaceklaskowski/
>> >>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> >>> Follow me at 

Re: Scala closure exceeds ByteArrayOutputStream limit (~2gb)

2017-08-22 Thread Mungeol Heo
Hello, Joel.

Have you solved the problem which is Java's 32-bit limit on array sizes?

Thanks.

On Wed, Jan 27, 2016 at 2:36 AM, Joel Keller  wrote:
> Hello,
>
> I am running RandomForest from mllib on a data-set which has very-high
> dimensional data (~50k dimensions).
>
> I get the following stack trace:
>
> 16/01/22 21:52:48 ERROR ApplicationMaster: User class threw exception:
> java.lang.OutOfMemoryError
> java.lang.OutOfMemoryError
> at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2021)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
> at
> org.apache.spark.mllib.tree.DecisionTree$.findBestSplits(DecisionTree.scala:624)
> at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:235)
> at
> org.apache.spark.mllib.tree.RandomForest$.trainClassifier(RandomForest.scala:291)
> at
> org.apache.spark.mllib.tree.RandomForest.trainClassifier(RandomForest.scala)
> at com.miovision.cv.spark.CifarTrainer.main(CifarTrainer.java:108)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525
>
>
>
> I have determined that the problem is that when the ClosureCleaner checks
> that a closure is serializable (ensureSerializable), it serializes the
> closure to an underlying java bytebuffer, which is limited to about 2gb (due
> to signed 32-bit int).
>
> I believe that the closure has grown very large due to the high number of
> features (dimensions), and the statistics that must be collected for them.
>
>
> Does anyone know if there is a way that I can make mllib's randomforest
> implementation limit the size here such that it will not exceed 2gb
> serialized-closures, or alternatively is there a way to allow spark to work
> with such a large closure?
>
>
> I am running this training on a very large cluster of very large machines,
> so RAM is not the problem here.  Problem is java's 32-bit limit on array
> sizes.
>
>
> Thanks,
>
> Joel

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SPARK Issue in Standalone cluster

2017-08-22 Thread Sea aj
Hi everyone,

I have a huge dataframe with 1 billion rows and each row is a nested list.
That being said, I want to train some ML models on this df but due to the
huge size, I get out memory error on one of my nodes when I run fit
function.

currently, my configuration is:
144 cores, 16 cores for each of the 8 nodes.
100gb of ram for each slave and 100gb of ram for the driver. I set the
maxResultSize to be 20gb.

Do you have any suggestion so far?

I can think of splitting the data to multiple dataframes and then training
the model on each individually but besides the longer runtime, I learned
that fit function overwrites the previous model each time I call it. Isn't
there a way to get the fit function to train the new model with regard to
the previously trained model?

Thanks





On Sun, Aug 6, 2017 at 11:04 PM, Gourav Sengupta 
wrote:

> Hi Marco,
>
> thanks a ton, I will surely use those alternatives.
>
>
> Regards,
> Gourav Sengupta
>
> On Sun, Aug 6, 2017 at 3:45 PM, Marco Mistroni 
> wrote:
>
>> Sengupta
>>  further to this, if you try the following notebook in databricks cloud,
>> it will read a .csv file , write to a parquet file and read it again (just
>> to count the number of rows stored)
>> Please note that the path to the csv file might differ for you.
>> So, what you will need todo is
>> 1 - create an account to community.cloud.databricks.com
>> 2 - upload the .csv file onto the Data of your databricks private cluster
>> 3  - run the script. that will store the data on the distrubuted
>> filesystem of the databricks cloudn (dbfs)
>>
>> It's worth investing in this free databricks cloud as it can create a
>> cluster for you with minimal effort, and it's  a very easy way to test your
>> spark scripts on a real cluster
>>
>> hope this helps
>> kr
>>
>> ##
>> from pyspark.sql import SQLContext
>>
>> from random import randint
>> from time import sleep
>> from pyspark.sql.session import SparkSession
>> import logging
>> logger = logging.getLogger(__name__)
>> logger.setLevel(logging.INFO)
>> ch = logging.StreamHandler()
>> logger.addHandler(ch)
>>
>>
>> import sys
>>
>> def read_parquet_file(parquetFileName):
>>   logger.info('Reading now the parquet files we just created...:%s',
>> parquetFileName)
>>   parquet_data = sqlContext.read.parquet(parquetFileName)
>>   logger.info('Parquet file has %s', parquet_data.count())
>>
>> def dataprocessing(filePath, count, sqlContext):
>> logger.info( 'Iter count is:%s' , count)
>> if count == 0:
>> print 'exiting'
>> else:
>> df_traffic_tmp = sqlContext.read.format("csv").
>> option("header",'true').load(filePath)
>> logger.info( '#DataSet has:%s' ,
>> df_traffic_tmp.count())
>> logger.info('WRting to a parquet file')
>> parquetFileName = "dbfs:/myParquetDf2.parquet"
>> df_traffic_tmp.write.parquet(parquetFileName)
>> sleepInterval = randint(10,100)
>> logger.info( '#Sleeping for %s' ,
>> sleepInterval)
>> sleep(sleepInterval)
>> read_parquet_file(parquetFileName)
>> dataprocessing(filePath, count-1, sqlContext)
>>
>> filename = '/FileStore/tables/wb4y1wrv1502027870004/tree_addhealth.csv'#This
>> path might differ for you
>> iterations = 1
>> logger.info('--')
>> logger.info('Filename:%s', filename)
>> logger.info('Iterations:%s', iterations )
>> logger.info('--')
>>
>> logger.info ('Initializing sqlContext')
>> logger.info( 'Starting spark..Loading from%s for %s
>> iterations' , filename, iterations)
>> logger.info(  'Starting up')
>> sc = SparkSession.builder.appName("Data Processsing").getOrCreate()
>> logger.info ('Initializing sqlContext')
>> sqlContext = SQLContext(sc)
>> dataprocessing(filename, iterations, sqlContext)
>> logger.info('Out of here..')
>> ##
>>
>>
>> On Sat, Aug 5, 2017 at 9:09 PM, Marco Mistroni 
>> wrote:
>>
>>> Uh believe me there are lots of ppl on this list who will send u code
>>> snippets if u ask... 
>>>
>>> Yes that is what Steve pointed out, suggesting also that for that simple
>>> exercise you should perform all operations on a spark standalone instead
>>> (or alt. Use an nfs on the cluster)
>>> I'd agree with his suggestion
>>> I suggest u another alternative:
>>> https://community.cloud.databricks.com/
>>>
>>> That's a ready made cluster and you can run your spark app as well store
>>> data on the cluster (well I haven't tried myself but I assume it's
>>> possible).   Try that out... I will try ur script there as I have an
>>> account there (though I guess I'll get there before me.)
>>>
>>> Try that out and let me know if u get stuck
>>> Kr
>>>
>>> On Aug 5, 2017 8:40 PM, "Gourav Sengupta" 
>>> wrote:
>>>
 Hi Marco,