RE: Broadcasting a parquet file using spark and python

2015-12-07 Thread Shuai Zheng
Hi Michael,

 

Thanks for feedback.

 

I am using version 1.5.2 now.

 

Can you tell me how to enforce the broadcast join? I don’t want to let the 
engine to decide the execution path of join. I want to use hint or parameter to 
enforce broadcast join (because I also have some cases are inner join but I 
want to use broadcast join).

 

Or is there any ticket or roadmap for this feature?

 

Regards,

 

Shuai

 

 

From: Michael Armbrust [mailto:mich...@databricks.com] 
Sent: Saturday, December 05, 2015 4:11 PM
To: Shuai Zheng
Cc: Jitesh chandra Mishra; user
Subject: Re: Broadcasting a parquet file using spark and python

 

I believe we started supporting broadcast outer joins in Spark 1.5.  Which 
version are you using? 

 

On Fri, Dec 4, 2015 at 2:49 PM, Shuai Zheng <szheng.c...@gmail.com> wrote:

Hi all,

 

Sorry to re-open this thread.

 

I have a similar issue, one big parquet file left outer join quite a few 
smaller parquet files. But the running is extremely slow and even OOM sometimes 
(with 300M , I have two questions here:

 

1, If I use outer join, will Spark SQL auto use broadcast hashjoin?

2, If not, in the latest documents: 
http://spark.apache.org/docs/latest/sql-programming-guide.html

 


spark.sql.autoBroadcastJoinThreshold

10485760 (10 MB)

Configures the maximum size in bytes for a table that will be broadcast to all 
worker nodes when performing a join. By setting this value to -1 broadcasting 
can be disabled. Note that currently statistics are only supported for Hive 
Metastore tables where the command ANALYZE TABLE  COMPUTE STATISTICS 
noscan has been run.

 

How can I do this (run command analyze table) in Java? I know I can code it by 
myself (create a broadcast val and implement lookup by myself), but it will 
make code super ugly.

 

I hope we can have either API or hint to enforce the hashjoin (instead of this 
suspicious autoBroadcastJoinThreshold parameter). Do we have any ticket or 
roadmap for this feature?

 

Regards,

 

Shuai

 

From: Michael Armbrust [mailto:mich...@databricks.com] 
Sent: Wednesday, April 01, 2015 2:01 PM
To: Jitesh chandra Mishra
Cc: user
Subject: Re: Broadcasting a parquet file using spark and python

 

You will need to create a hive parquet table that points to the data and run 
"ANALYZE TABLE tableName noscan" so that we have statistics on the size.

 

On Tue, Mar 31, 2015 at 9:36 PM, Jitesh chandra Mishra <jitesh...@gmail.com> 
wrote:

Hi Michael,

 

Thanks for your response. I am running 1.2.1. 

 

Is there any workaround to achieve the same with 1.2.1?

 

Thanks,

Jitesh

 

On Wed, Apr 1, 2015 at 12:25 AM, Michael Armbrust <mich...@databricks.com> 
wrote:

In Spark 1.3 I would expect this to happen automatically when the parquet table 
is small (< 10mb, configurable with spark.sql.autoBroadcastJoinThreshold).  If 
you are running 1.3 and not seeing this, can you show the code you are using to 
create the table?

 

On Tue, Mar 31, 2015 at 3:25 AM, jitesh129 <jitesh...@gmail.com> wrote:

How can we implement a BroadcastHashJoin for spark with python?

My SparkSQL inner joins are taking a lot of time since it is performing
ShuffledHashJoin.

Tables on which join is performed are stored as parquet files.

Please help.

Thanks and regards,
Jitesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 

 

 

 



RE: Broadcasting a parquet file using spark and python

2015-12-04 Thread Shuai Zheng
Hi all,

 

Sorry to re-open this thread.

 

I have a similar issue, one big parquet file left outer join quite a few 
smaller parquet files. But the running is extremely slow and even OOM sometimes 
(with 300M , I have two questions here:

 

1, If I use outer join, will Spark SQL auto use broadcast hashjoin?

2, If not, in the latest documents: 
http://spark.apache.org/docs/latest/sql-programming-guide.html

 


spark.sql.autoBroadcastJoinThreshold

10485760 (10 MB)

Configures the maximum size in bytes for a table that will be broadcast to all 
worker nodes when performing a join. By setting this value to -1 broadcasting 
can be disabled. Note that currently statistics are only supported for Hive 
Metastore tables where the command ANALYZE TABLE  COMPUTE STATISTICS 
noscan has been run.

 

How can I do this (run command analyze table) in Java? I know I can code it by 
myself (create a broadcast val and implement lookup by myself), but it will 
make code super ugly.

 

I hope we can have either API or hint to enforce the hashjoin (instead of this 
suspicious autoBroadcastJoinThreshold parameter). Do we have any ticket or 
roadmap for this feature?

 

Regards,

 

Shuai

 

From: Michael Armbrust [mailto:mich...@databricks.com] 
Sent: Wednesday, April 01, 2015 2:01 PM
To: Jitesh chandra Mishra
Cc: user
Subject: Re: Broadcasting a parquet file using spark and python

 

You will need to create a hive parquet table that points to the data and run 
"ANALYZE TABLE tableName noscan" so that we have statistics on the size.

 

On Tue, Mar 31, 2015 at 9:36 PM, Jitesh chandra Mishra  
wrote:

Hi Michael,

 

Thanks for your response. I am running 1.2.1. 

 

Is there any workaround to achieve the same with 1.2.1?

 

Thanks,

Jitesh

 

On Wed, Apr 1, 2015 at 12:25 AM, Michael Armbrust  
wrote:

In Spark 1.3 I would expect this to happen automatically when the parquet table 
is small (< 10mb, configurable with spark.sql.autoBroadcastJoinThreshold).  If 
you are running 1.3 and not seeing this, can you show the code you are using to 
create the table?

 

On Tue, Mar 31, 2015 at 3:25 AM, jitesh129  wrote:

How can we implement a BroadcastHashJoin for spark with python?

My SparkSQL inner joins are taking a lot of time since it is performing
ShuffledHashJoin.

Tables on which join is performed are stored as parquet files.

Please help.

Thanks and regards,
Jitesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 

 

 



RE: Spark Tasks on second node never return in Yarn when I have more than 1 task node

2015-11-24 Thread Shuai Zheng
Hi All,

 

Hi Just an update on this case.

 

I try many different combination on settings (and I just upgrade to latest EMR 
4.2.0 with Spark 1.5.2).

 

I just found out that the problem is from:

 

spark-submit --deploy-mode client --executor-cores=24 --driver-memory=5G 
--executor-memory=45G

 

If I set the –executor-cores=20 (or anything less than 20, there is no issue). 

 

This is a quite interesting case, because the instance (C3*8xlarge) has 32 
virtual core and can run without any issue with one task .

 

So I guess the issue should come from:

1, connection limit from EC2 instance on EMR to S3 (this reason doesn’t make 
enough sense to me, I will contact EMR support to clarify)

2, some library packed in the jar cause this limit? (also not very reasonable).

 

Report here in case anyone face similar issue.

 

Regards,

 

Shuai

 

From: Jonathan Kelly [mailto:jonathaka...@gmail.com] 
Sent: Thursday, November 19, 2015 6:54 PM
To: Shuai Zheng
Cc: user
Subject: Re: Spark Tasks on second node never return in Yarn when I have more 
than 1 task node

 

I don't know if this actually has anything to do with why your job is hanging, 
but since you are using EMR you should probably not set those fs.s3 properties 
but rather let it use EMRFS, EMR's optimized Hadoop FileSystem implementation 
for interacting with S3. One benefit is that it will automatically pick up your 
AWS credentials from your EC2 instance role rather than you having to configure 
them manually (since doing so is insecure because you have to get the secret 
access key onto your instance).

 

If simply making that change does not fix the issue, a jstack of the hung 
process would help you figure out what it is doing. You should also look at the 
YARN container logs (which automatically get uploaded to your S3 logs bucket if 
you have this enabled).

 

~ Jonathan

 

On Thu, Nov 19, 2015 at 1:32 PM, Shuai Zheng <szheng.c...@gmail.com> wrote:

Hi All,

 

I face a very weird case. I have already simplify the scenario to the most so 
everyone can replay the scenario. 

 

My env:

 

AWS EMR 4.1.0, Spark1.5

 

My code can run without any problem when I run it in a local mode, and it has 
no problem when it run on a EMR cluster with one master and one task node. 

 

But when I try to run a multiple node (more than 1 task node, which means 3 
nodes cluster), the tasks will never return from one of it. 

 

The log as below:

 

15/11/19 21:19:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes)

 

So you can see the task will alternatively submitted to two instances, one is 
ip-10-165-121-188 and another is ip-10-155-160-147.

And later only the tasks runs on the ip-10-165-121-188.ec2 will finish will 
always just wait there, ip-10-155-160-147.ec2 never return.

 

The data and code has been tested in local mode, single spark cluster mode, so 
it should not be an issue on logic or data.

 

And I have attached my test case here (I believe it is simple enough and no any 
business logic is involved):

 

   public void createSiteGridExposure2() {

  JavaSparkContext ctx = this.createSparkContextTest("Test");

  ctx.textFile(siteEncodeLocation).flatMapToPair(new 
PairFlatMapFunction<String, String, String>() {

 @Override

 public Iterable<Tuple2<String, String>> call(String line) 
throws Exception {

   List<Tuple2<String, String>> res = new 
ArrayList<Tuple2<String, String>>();

   return res;

 }

  }).collectAsMap();

  ctx.stop();

   }

 

protected JavaSparkContext createSparkContextTest(String appName) {

  SparkConf sparkConf = new SparkConf().setAppName(appName);

 

  JavaSparkContext ctx = new JavaSparkContext(sparkConf);

  Configuration hadoopConf = ctx.hadoopConfiguration();

  if (awsAccessKeyId != null) {

 

 hadoopConf.set("fs.s3.impl", 
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

 hadoopConf.set("fs.s3.awsAccessKeyId", awsAccessKeyId);

 hadoopConf.set("fs.s3.awsSecretAccessKey", 
awsSecretAccessKey);

 

 hadoopConf.set("fs.s3n.impl", 
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

 ha

Spark Tasks on second node never return in Yarn when I have more than 1 task node

2015-11-19 Thread Shuai Zheng
Hi All,

 

I face a very weird case. I have already simplify the scenario to the most
so everyone can replay the scenario. 

 

My env:

 

AWS EMR 4.1.0, Spark1.5

 

My code can run without any problem when I run it in a local mode, and it
has no problem when it run on a EMR cluster with one master and one task
node. 

 

But when I try to run a multiple node (more than 1 task node, which means 3
nodes cluster), the tasks will never return from one of it. 

 

The log as below:

 

15/11/19 21:19:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
0, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
1, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
2, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
3, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes)

 

So you can see the task will alternatively submitted to two instances, one
is ip-10-165-121-188 and another is ip-10-155-160-147.

And later only the tasks runs on the ip-10-165-121-188.ec2 will finish will
always just wait there, ip-10-155-160-147.ec2 never return.

 

The data and code has been tested in local mode, single spark cluster mode,
so it should not be an issue on logic or data.

 

And I have attached my test case here (I believe it is simple enough and no
any business logic is involved):

 

   public void createSiteGridExposure2() {

  JavaSparkContext ctx = this.createSparkContextTest("Test");

  ctx.textFile(siteEncodeLocation).flatMapToPair(new
PairFlatMapFunction() {

 @Override

 public Iterable> call(String
line) throws Exception {

   List> res = new
ArrayList>();

   return res;

 }

  }).collectAsMap();

  ctx.stop();

   }

 

protected JavaSparkContext createSparkContextTest(String appName) {

  SparkConf sparkConf = new SparkConf().setAppName(appName);

 

  JavaSparkContext ctx = new JavaSparkContext(sparkConf);

  Configuration hadoopConf = ctx.hadoopConfiguration();

  if (awsAccessKeyId != null) {

 

 hadoopConf.set("fs.s3.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

 hadoopConf.set("fs.s3.awsAccessKeyId", awsAccessKeyId);

 hadoopConf.set("fs.s3.awsSecretAccessKey",
awsSecretAccessKey);

 

 hadoopConf.set("fs.s3n.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

 hadoopConf.set("fs.s3n.awsAccessKeyId",
awsAccessKeyId);

 hadoopConf.set("fs.s3n.awsSecretAccessKey",
awsSecretAccessKey);

  }

  return ctx;

   }

 

 

Anyone has any idea why this happened? I am a bit lost because the code
works in local mode and 2 node (1 master 1 task) clusters, but when it move
a multiple task nodes cluster, I have this issue. No error no exception, not
even timeout (because I wait more than 1 hours and there is no timeout
also).

 

Regards,

 

Shuai



RE: [Spark 1.5]: Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError: Java heap space -- Work in 1.4, but 1.5 doesn't

2015-11-04 Thread Shuai Zheng
And an update is: this ONLY happen in Spark 1.5, I try to run it under Spark
1.4 and 1.4.1, there are no issue (the program is developed under Spark 1.4
last time, and I just re-test it, it works). So this is proven that there is
no issue on the logic and data, it is caused by the new version of Spark.

 

So I want to know any new setup I should set in Spark 1.5 to make it work? 

 

Regards,

 

Shuai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Wednesday, November 04, 2015 3:22 PM
To: user@spark.apache.org
Subject: [Spark 1.5]: Exception in thread "broadcast-hash-join-2"
java.lang.OutOfMemoryError: Java heap space

 

Hi All,

 

I have a program which actually run a bit complex business (join) in spark.
And I have below exception:

 

I running on Spark 1.5, and with parameter:

 

spark-submit --deploy-mode client --executor-cores=24 --driver-memory=2G
--executor-memory=45G -class . 

 

Some other setup:

 

sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buff
er.max", "2047m");

sparkConf.set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps").set("spark.sql.autoBroadcastJoinThreshold",
"104857600");

 

This is running on AWS c3*8xlarge instance. I am not sure what kind of
parameter I should set if I have below OutOfMemoryError exception.

 

#

# java.lang.OutOfMemoryError: Java heap space

# -XX:OnOutOfMemoryError="kill -9 %p"

#   Executing /bin/sh -c "kill -9 10181"...

Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError: Java
heap space

at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProje
ction.apply(Unknown Source)

at
org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelat
ion.scala:380)

at
org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.sc
ala:123)

at
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca
stFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:95)

at
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca
stFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:85)

at
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.sc
ala:100)

at
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca
stFuture$1.apply(BroadcastHashOuterJoin.scala:85)

at
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca
stFuture$1.apply(BroadcastHashOuterJoin.scala:85)

at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.
scala:24)

at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
45)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
15)

at java.lang.Thread.run(Thread.java:745)

 

Any hint will be very helpful.

 

Regards,

 

Shuai



[Spark 1.5]: Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError: Java heap space

2015-11-04 Thread Shuai Zheng
Hi All,

 

I have a program which actually run a bit complex business (join) in spark.
And I have below exception:

 

I running on Spark 1.5, and with parameter:

 

spark-submit --deploy-mode client --executor-cores=24 --driver-memory=2G
--executor-memory=45G -class . 

 

Some other setup:

 

sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buff
er.max", "2047m");

sparkConf.set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps").set("spark.sql.autoBroadcastJoinThreshold",
"104857600");

 

This is running on AWS c3*8xlarge instance. I am not sure what kind of
parameter I should set if I have below OutOfMemoryError exception.

 

#

# java.lang.OutOfMemoryError: Java heap space

# -XX:OnOutOfMemoryError="kill -9 %p"

#   Executing /bin/sh -c "kill -9 10181"...

Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError: Java
heap space

at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProje
ction.apply(Unknown Source)

at
org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelat
ion.scala:380)

at
org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.sc
ala:123)

at
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca
stFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:95)

at
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca
stFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:85)

at
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.sc
ala:100)

at
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca
stFuture$1.apply(BroadcastHashOuterJoin.scala:85)

at
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca
stFuture$1.apply(BroadcastHashOuterJoin.scala:85)

at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.
scala:24)

at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
45)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
15)

at java.lang.Thread.run(Thread.java:745)

 

Any hint will be very helpful.

 

Regards,

 

Shuai



How to Take the whole file as a partition

2015-09-03 Thread Shuai Zheng
Hi All,

 

I have 1000 files, from 500M to 1-2GB at this moment. And I want my spark
can read them as partition on the file level. Which means want the FileSplit
turn off.

 

I know there are some solutions, but not very good in my case:

1, I can't use WholeTextFiles method, because my file is too big, I don't
want to risk the performance.

2, I try to use newAPIHadoopFile and turnoff the file split:

 

lines =
ctx.newAPIHadoopFile(inputPath, NonSplitableTextInputFormat.class,
LongWritable.class, Text.class, hadoopConf).values()

 
.map(new Function() {

 
@Override

 
public String call(Text arg0) throws Exception {

 
return arg0.toString();

 
}

 
});

 

This works for some cases, but it truncate some lines (I am not sure why,
but it looks like there is a limit on this file reading). I have a feeling
that the spark truncate this file on 2GB bytes. Anyway it happens (because
same data has no issue when I use mapreduce to do the input), the spark
sometimes do a trunc on very big file if try to read all of them.

 

3, I can do another way is distribute the file name as the input of the
Spark and in function open stream to read the file directly. This is what I
am planning to do but I think it is ugly. I want to know anyone have better
solution for it?

 

BTW: the file currently in text format, but it might be parquet format
later, that is also reason I don't like my third option.

 

Regards,

 

Shuai



RE: How to Take the whole file as a partition

2015-09-03 Thread Shuai Zheng
Hi, 

 

Will there any way to change the default split size when load data for Spark? 
By default it is 64M, I know how to change this in Hadoop Mapreduce, but not 
sure how to do this in Spark.

 

Regards,

 

Shuai

 

From: Tao Lu [mailto:taolu2...@gmail.com] 
Sent: Thursday, September 03, 2015 11:07 AM
To: Shuai Zheng
Cc: user
Subject: Re: How to Take the whole file as a partition

 

You situation is special. It seems to me Spark may not fit well in your case. 

 

You want to process the individual files (500M~2G) as a whole, you want good 
performance. 

 

You may want to write our own Scala/Java programs and distribute it along with 
those files across your cluster, and run them in parallel. 

 

If you insist on using Spark, maybe option 3 is closer. 

 

Cheers,

Tao

 

 

On Thu, Sep 3, 2015 at 10:22 AM, Shuai Zheng <szheng.c...@gmail.com> wrote:

Hi All,

 

I have 1000 files, from 500M to 1-2GB at this moment. And I want my spark can 
read them as partition on the file level. Which means want the FileSplit turn 
off.

 

I know there are some solutions, but not very good in my case:

1, I can’t use WholeTextFiles method, because my file is too big, I don’t want 
to risk the performance.

2, I try to use newAPIHadoopFile and turnoff the file split:

 

lines = 
ctx.newAPIHadoopFile(inputPath, NonSplitableTextInputFormat.class, 
LongWritable.class, Text.class, hadoopConf).values()


.map(new Function<Text, String>() {


@Override


public String call(Text arg0) throws Exception {


return arg0.toString();


}


});

 

This works for some cases, but it truncate some lines (I am not sure why, but 
it looks like there is a limit on this file reading). I have a feeling that the 
spark truncate this file on 2GB bytes. Anyway it happens (because same data has 
no issue when I use mapreduce to do the input), the spark sometimes do a trunc 
on very big file if try to read all of them.

 

3, I can do another way is distribute the file name as the input of the Spark 
and in function open stream to read the file directly. This is what I am 
planning to do but I think it is ugly. I want to know anyone have better 
solution for it?

 

BTW: the file currently in text format, but it might be parquet format later, 
that is also reason I don’t like my third option.

 

Regards,

 

Shuai





 

-- 



Thanks!
Tao



org.apache.hadoop.security.AccessControlException: Permission denied when access S3

2015-08-20 Thread Shuai Zheng
Hi All,

 

I try to access S3 file from S3 in Hadoop file format:

 

Below is my code:

 

 Configuration hadoopConf = ctx.hadoopConfiguration();

 hadoopConf.set(fs.s3n.awsAccessKeyId,
this.getAwsAccessKeyId());

 hadoopConf.set(fs.s3n.awsSecretAccessKey,
this.getAwsSecretAccessKey());

 lines = ctx.newAPIHadoopFile(inputPath,
NonSplitableTextInputFormat.class, LongWritable.class, Text.class,
hadoopConf).values()

  .map(new FunctionText, String() {

 @Override

 public String call(Text arg0)
throws Exception {

return arg0.toString();

 }

  });

And I have below error:

 

Exception in thread main
org.apache.hadoop.security.AccessControlException: Permission denied:
s3n://

at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(J
ets3tNativeFileSystemStore.java:449)

at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(J
ets3tNativeFileSystemStore.java:427)

 

The permission should not have any problem (because I can use ctx.textFile
without any issue). So the issue from the call: newAPIHadoopFile

 

Anything else I need to setup for this?

 

Regards,

 

Shuai



RE: [SPARK-6330] 1.4.0/1.5.0 Bug to access S3 -- AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyI

2015-06-10 Thread Shuai Zheng
I have tried both cases(s3 and s3n, set all possible parameters), and trust me, 
the same code works with 1.3.1, but not for 1.3.0 and 1.4.0, 1.5.0.

 

I even use a plain project to test this, and use maven to include all 
referenced library, but it give me error. 

 

I think everyone can easily to replicate my issue locally (the code doesn’t 
need to run on EC2, I run it directly from my local windows pc).

 

Regards,

 

Shuai

 

From: Aaron Davidson [mailto:ilike...@gmail.com] 
Sent: Wednesday, June 10, 2015 12:28 PM
To: Shuai Zheng
Subject: Re: [SPARK-6330] 1.4.0/1.5.0 Bug to access S3 -- AWS Access Key ID and 
Secret Access Key must be specified as the username or password (respectively) 
of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey 
properties (respectively)

 

That exception is a bit weird as it refers to fs.s3 instead of fs.s3n. Maybe 
you are accidentally using s3://? Otherwise, you might try also specifying that 
property too. 

On Jun 9, 2015 12:45 PM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

I have some code to access s3 from Spark. The code is as simple as:

 

  JavaSparkContext ctx = new JavaSparkContext(sparkConf);

  Configuration hadoopConf = ctx.hadoopConfiguration();

 

  hadoopConf.set(fs.s3n.impl, 
org.apache.hadoop.fs.s3native.NativeS3FileSystem);

  hadoopConf.set(fs.s3n.awsAccessKeyId, 
---);

  hadoopConf.set(fs.s3n.awsSecretAccessKey, 
--);

  SQLContext sql = new SQLContext(ctx);

  DataFrame grid_lookup = 
sql.parquetFile(s3n://---);

  grid_lookup.count();

  ctx.stop();

 

The code works for 1.3.1. And for 1.4.0 and latest 1.5.0, it always give me 
below exception:

 

Exception in thread main java.lang.IllegalArgumentException: AWS Access Key 
ID and Secret Access Key must be specified as the username or password 
(respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or 
fs.s3.awsSecretAccessKey properties (respectively).

 

I don’t know why, I remember this is a known issue in 1.3.0: 
https://issues.apache.org/jira/browse/SPARK-6330, and solved in 1.3.1

But now it is not working again for a newer version? 

 

I remember while I switched to 1.4.0, for a while it works (while I worked with 
the master branch of the latest source code), and I just refresh latest code, 
and I am given this error again. 

 

Anyone has idea?

 

Regards,

 

Shuai



[SPARK-6330] 1.4.0/1.5.0 Bug to access S3 -- AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or

2015-06-09 Thread Shuai Zheng
Hi All,

 

I have some code to access s3 from Spark. The code is as simple as:

 

  JavaSparkContext ctx = new JavaSparkContext(sparkConf);

  Configuration hadoopConf = ctx.hadoopConfiguration();

  // aws.secretKey=Zqhjim3GB69hMBvfjh+7NX84p8sMF39BHfXwO3Hs

  // aws.accessKey=AKIAI4YXBAJTJ77VKS4A

 

  hadoopConf.set(fs.s3n.impl,
org.apache.hadoop.fs.s3native.NativeS3FileSystem);

  hadoopConf.set(fs.s3n.awsAccessKeyId,
---);

  hadoopConf.set(fs.s3n.awsSecretAccessKey,
--);

  SQLContext sql = new SQLContext(ctx);

  DataFrame grid_lookup =
sql.parquetFile(s3n://---);

  grid_lookup.count();

  ctx.stop();

 

The code works for 1.3.1. And for 1.4.0 and latest 1.5.0, it always give me
below exception:

 

Exception in thread main java.lang.IllegalArgumentException: AWS Access
Key ID and Secret Access Key must be specified as the username or password
(respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
fs.s3.awsSecretAccessKey properties (respectively).

 

I don't know why, I remember this is a known issue in 1.3.0:
https://issues.apache.org/jira/browse/SPARK-6330, and solved in 1.3.1

But now it is not working again for a newer version? 

 

I remember while I switched to 1.4.0, for a while it works (while I worked
with the master branch of the latest source code), and I just refresh latest
code, and I am given this error again. 

 

Anyone has idea?

 

Regards,

 

Shuai



[SQL][1.3.1][JAVA]Use UDF in DataFrame join

2015-04-27 Thread Shuai Zheng
Hi All,

 

I want to ask how to use UDF when I use join function on DataFrame. It looks
like always give me the cannot solve the column name error. 

Anyone can give me an example on how to run this in java?

 

My code is like:

 

edmData.join(yb_lookup,
edmData.col(year(YEARBUILT)).equalTo(yb_lookup.col(yb_class_vdm)));

 

But it won't work in java. I understand col function should only take
columname, but there should be a way to specific some simple expression in
join statement?

 

Regards,

 

Shuai



[SQL][1.3.1][JAVA] UDF in java cause Task not serializable

2015-04-27 Thread Shuai Zheng
Hi All,

 

Basically I try to define a simple UDF and use it in the query, but it gives
me Task not serializable

 

   public void test() {

  RiskGroupModelDefinition model =
registeredRiskGroupMap.get(this.modelId);

  RiskGroupModelDefinition edm = this.createEdm();

  JavaSparkContext ctx = this.createSparkContext();

  SQLContext sql = new SQLContext(ctx);

  sql.udf().register(year, new UDF1Date, Integer() {

 @Override

 public Integer call(Date d) throws Exception {

   return d.getYear();

 }

  }, new org.apache.spark.sql.types.IntegerType());

  /** Retrieve all tables for EDM */

  DataFrame property =
sql.parquetFile(edm.toS3nPath(property)).filter(ISVALID = 1);

  property.registerTempTable(p);

 

  DataFrame yb_lookup =
sql.parquetFile(model.toS3nPath(yb_lookup)).as(yb);

  yb_lookup.registerTempTable(yb);

  sql.sql(select * from p left join yb on
year(p.YEARBUILT)=yb.yb_class_vdm).count();

  ctx.stop();

   }

 

If I remove the UDF, just use p.YEARBUILT=yb.yb_class_vdm, the sql runs
without any problem. But after I add the UDF to the query (just as above
code), the exception as below:

 

 

Exception in thread main
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,
tree:

Aggregate false, [], [Coalesce(SUM(PartialCount#43L),0) AS count#41L]

Exchange SinglePartition

  Aggregate true, [], [COUNT(1) AS PartialCount#43L]

   Project []

HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter,
None

 Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)

  Project [YEARBUILT#14]

   Filter ISVALID#18

PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573

 Exchange (HashPartitioning [yb_class_vdm#40L], 200)

  PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at
newParquet.scala:573

 

   at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)

   at
org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124)

   at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)

   at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)

   at org.apache.spark.sql.DataFrame.count(DataFrame.scala:899)

   at
com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.test(Vulna
bilityEncodeExecutor.java:137)

   at
com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.main(Vulna
bilityEncodeExecutor.java:488)

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:

Exchange SinglePartition

Aggregate true, [], [COUNT(1) AS PartialCount#43L]

  Project []

   HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter,
None

Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)

 Project [YEARBUILT#14]

  Filter ISVALID#18

   PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573

Exchange (HashPartitioning [yb_class_vdm#40L], 200)

 PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at
newParquet.scala:573

 

   at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)

   at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:48)

   at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.
scala:126)

   at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.
scala:125)

   at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)

   ... 6 more

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:

Aggregate true, [], [COUNT(1) AS PartialCount#43L]

Project []

  HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter,
None

   Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)

Project [YEARBUILT#14]

 Filter ISVALID#18

  PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573

   Exchange (HashPartitioning [yb_class_vdm#40L], 200)

PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at
newParquet.scala:573

 

   at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)

   at
org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124)

   at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc
ala:101)

   at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc
ala:49)

   at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)

   ... 10 more

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:

Exchange 

RE: Slower performance when bigger memory?

2015-04-27 Thread Shuai Zheng
Thanks. So may I know what is your configuration for more/smaller executors on 
r3.8xlarge, how big of the memory that you eventually decide to give one 
executor without impact performance (for example: 64g? ).

 

From: Sven Krasser [mailto:kras...@gmail.com] 
Sent: Friday, April 24, 2015 1:59 PM
To: Dean Wampler
Cc: Shuai Zheng; user@spark.apache.org
Subject: Re: Slower performance when bigger memory?

 

FWIW, I ran into a similar issue on r3.8xlarge nodes and opted for more/smaller 
executors. Another observation was that one large executor results in less 
overall read throughput from S3 (using Amazon's EMRFS implementation) in case 
that matters to your application.

-Sven

 

On Thu, Apr 23, 2015 at 10:18 AM, Dean Wampler deanwamp...@gmail.com wrote:

JVM's often have significant GC overhead with heaps bigger than 64GB. You might 
try your experiments with configurations below this threshold.

 

dean




Dean Wampler, Ph.D.

Author: Programming Scala, 2nd Edition 
http://shop.oreilly.com/product/0636920033073.do  (O'Reilly)

Typesafe http://typesafe.com 
@deanwampler http://twitter.com/deanwampler 

http://polyglotprogramming.com

 

On Thu, Apr 23, 2015 at 12:14 PM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

I am running some benchmark on r3*8xlarge instance. I have a cluster with one 
master (no executor on it) and one slave (r3*8xlarge).

 

My job has 1000 tasks in stage 0.

 

R3*8xlarge has 244G memory and 32 cores.

 

If I create 4 executors, each has 8 core+50G memory, each task will take around 
320s-380s. And if I only use one big executor with 32 cores and 200G memory, 
each task will take 760s-900s.

 

And I check the log, looks like the minor GC takes much longer when using 200G 
memory:

 

285.242: [GC [PSYoungGen: 29027310K-8646087K(31119872K)] 
38810417K-19703013K(135977472K), 11.2509770 secs] [Times: user=38.95 
sys=120.65, real=11.25 secs] 

 

And when it uses 50G memory, the minor GC takes only less than 1s.

 

I try to see what is the best way to configure the Spark. For some special 
reason, I tempt to use a bigger memory on single executor if no significant 
penalty on performance. But now looks like it is?

 

Anyone has any idea?

 

Regards,

 

Shuai

 




-- 

www.skrasser.com http://www.skrasser.com/?utm_source=sig 



Slower performance when bigger memory?

2015-04-23 Thread Shuai Zheng
Hi All,

 

I am running some benchmark on r3*8xlarge instance. I have a cluster with
one master (no executor on it) and one slave (r3*8xlarge).

 

My job has 1000 tasks in stage 0.

 

R3*8xlarge has 244G memory and 32 cores.

 

If I create 4 executors, each has 8 core+50G memory, each task will take
around 320s-380s. And if I only use one big executor with 32 cores and 200G
memory, each task will take 760s-900s.

 

And I check the log, looks like the minor GC takes much longer when using
200G memory:

 

285.242: [GC [PSYoungGen: 29027310K-8646087K(31119872K)]
38810417K-19703013K(135977472K), 11.2509770 secs] [Times: user=38.95
sys=120.65, real=11.25 secs] 

 

And when it uses 50G memory, the minor GC takes only less than 1s.

 

I try to see what is the best way to configure the Spark. For some special
reason, I tempt to use a bigger memory on single executor if no significant
penalty on performance. But now looks like it is?

 

Anyone has any idea?

 

Regards,

 

Shuai



RE: Bug? Can't reference to the column by name after join two DataFrame on a same name key

2015-04-23 Thread Shuai Zheng
Got it. Thanks! J

 

 

From: Yin Huai [mailto:yh...@databricks.com] 
Sent: Thursday, April 23, 2015 2:35 PM
To: Shuai Zheng
Cc: user
Subject: Re: Bug? Can't reference to the column by name after join two 
DataFrame on a same name key

 

Hi Shuai,

 

You can use as to create a table alias. For example, df1.as(df1). Then you 
can use $df1.col to refer it. 

 

Thanks,

 

Yin

 

On Thu, Apr 23, 2015 at 11:14 AM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

I use 1.3.1

 

When I have two DF and join them on a same name key, after that, I can’t get 
the common key by name.

 

Basically:

select * from t1 inner join t2 on t1.col1 = t2.col1

 

And I am using purely DataFrame, spark SqlContext not HiveContext

 

DataFrame df3 = df1.join(df2, df1.col(col).equalTo(df2.col(col))).select(col);

 

because df1 and df2 join on the same key col,

 

Then I can't reference the key col. I understand I should use a full qualified 
name for that column (like in SQL, use t1.col), but I don’t know how should I 
address this in spark sql.

 

Exception in thread main org.apache.spark.sql.AnalysisException: Reference 
'id' is ambiguous, could be: id#8L, id#0L.;

 

It looks that joined key can't be referenced by name or by df1.col name pattern.

The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive case, so I 
am not sure whether it is the same issue, but I still have the issue in latest 
code.

 

It looks like the result after join won't keep the parent DF information 
anywhere?

 

I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273

 

But not sure whether  it is the same issue? Should I open a new ticket for this?

 

Regards,

 

Shuai

 

 



Bug? Can't reference to the column by name after join two DataFrame on a same name key

2015-04-23 Thread Shuai Zheng
Hi All,

 

I use 1.3.1

 

When I have two DF and join them on a same name key, after that, I can't get
the common key by name.

 

Basically:

select * from t1 inner join t2 on t1.col1 = t2.col1

 

And I am using purely DataFrame, spark SqlContext not HiveContext

 

DataFrame df3 = df1.join(df2,
df1.col(col).equalTo(df2.col(col))).select(col);

 

because df1 and df2 join on the same key col,

 

Then I can't reference the key col. I understand I should use a full
qualified name for that column (like in SQL, use t1.col), but I don't know
how should I address this in spark sql.

 

Exception in thread main org.apache.spark.sql.AnalysisException: Reference
'id' is ambiguous, could be: id#8L, id#0L.;

 

It looks that joined key can't be referenced by name or by df1.col name
pattern.

The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive case,
so I am not sure whether it is the same issue, but I still have the issue in
latest code.

 

It looks like the result after join won't keep the parent DF information
anywhere?

 

I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273

 

But not sure whether  it is the same issue? Should I open a new ticket for
this?

 

Regards,

 

Shuai

 



RE: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)

2015-04-22 Thread Shuai Zheng
Below is my code to access s3n without problem (only for 1.3.1. there is a bug 
in 1.3.0).

 

  Configuration hadoopConf = ctx.hadoopConfiguration();

  hadoopConf.set(fs.s3n.impl, 
org.apache.hadoop.fs.s3native.NativeS3FileSystem);

  hadoopConf.set(fs.s3n.awsAccessKeyId, awsAccessKeyId);

  hadoopConf.set(fs.s3n.awsSecretAccessKey, awsSecretAccessKey);

 

Regards,

 

Shuai

 

From: Sujee Maniyam [mailto:su...@sujee.net] 
Sent: Wednesday, April 22, 2015 12:45 PM
To: Spark User List
Subject: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme 
s3n:)

 

Hi all

I am unable to access s3n://  urls using   sc.textFile().. getting 'no file 
system for scheme s3n://'  error.

 

a bug or some conf settings missing?

 

See below for details:

 

env variables : 

AWS_SECRET_ACCESS_KEY=set

AWS_ACCESS_KEY_ID=set

 

spark/RELAESE :

Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0

Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
-Pyarn -DzincPort=3034

 

 

./bin/spark-shell

 val f = sc.textFile(s3n://bucket/file)

 f.count

 

error== 

java.io.IOException: No FileSystem for scheme: s3n

at 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)

at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

at 
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)

at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)

at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)

at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)

at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)

at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)

at org.apache.spark.rdd.RDD.count(RDD.scala:1006)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31)

at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33)

at $iwC$$iwC$$iwC$$iwC.init(console:35)

at $iwC$$iwC$$iwC.init(console:37)

at $iwC$$iwC.init(console:39)

at $iwC.init(console:41)

at init(console:43)

at .init(console:47)

at .clinit(console)

at .init(console:7)

at .clinit(console)

at $print(console)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)

at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)

at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)

at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)

at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)

at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)

at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)

at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)

at 

RE:RE:maven compile error

2015-04-21 Thread Shuai Zheng
I have similar issue (I failed on the spark core project but with same 
exception as you). Then I follow the below steps (I am working on windows):

 

Delete the maven repository, and re-download all dependency. The issue sounds 
like a corrupted jar can’t be opened by maven.

 

Other than this, I also did below steps (I don’t think it is the solution, but 
I just describe my steps):

1, Uninstall scala 2.11 version (I have one install there). Then I only have 
2.10.5 on my pc

2, Upgrade maven to latest 3.3.1 

3, Install latest git client 

 

Regards,

 

Shuai

 

From: myelinji [mailto:myeli...@aliyun.com] 
Sent: Friday, April 03, 2015 6:58 AM
To: Sean Owen
Cc: spark用户组
Subject: 答复:maven compile error

 

Thank you for your reply. When I'm using maven to compile the whole project, 
the erros as follows

 

[INFO] Spark Project Parent POM .. SUCCESS [4.136s]
[INFO] Spark Project Networking .. SUCCESS [7.405s]
[INFO] Spark Project Shuffle Streaming Service ... SUCCESS [5.071s]
[INFO] Spark Project Core  SUCCESS [3:08.445s]
[INFO] Spark Project Bagel ... SUCCESS [21.613s]
[INFO] Spark Project GraphX .. SUCCESS [58.915s]
[INFO] Spark Project Streaming ... SUCCESS [1:26.202s]
[INFO] Spark Project Catalyst  FAILURE [1.537s]
[INFO] Spark Project SQL . SKIPPED
[INFO] Spark Project ML Library .. SKIPPED
[INFO] Spark Project Tools ... SKIPPED
[INFO] Spark Project Hive  SKIPPED
[INFO] Spark Project REPL  SKIPPED
[INFO] Spark Project Assembly  SKIPPED
[INFO] Spark Project External Twitter  SKIPPED
[INFO] Spark Project External Flume Sink . SKIPPED
[INFO] Spark Project External Flume .. SKIPPED
[INFO] Spark Project External MQTT ... SKIPPED
[INFO] Spark Project External ZeroMQ . SKIPPED
[INFO] Spark Project External Kafka .. SKIPPED
[INFO] Spark Project Examples  SKIPPED

 

it seems like there is something wrong with calatlyst project. Why i cannot 
compile this project?

 

 

--

发件人:Sean Owen so...@cloudera.com

发送时间:2015年4月3日(星期五) 17:48

收件人:myelinji myeli...@aliyun.com

抄 送:spark用户组 user@spark.apache.org

主 题:Re: maven compile error

 

If you're asking about a compile error, you should include the command
you used to compile.

I am able to compile branch 1.2 successfully with mvn -DskipTests
clean package.

This error is actually an error from scalac, not a compile error from
the code. It sort of sounds like it has not been able to download
scala dependencies. Check or maybe recreate your environment.

On Fri, Apr 3, 2015 at 3:19 AM, myelinji myeli...@aliyun.com wrote:
 Hi,all:
 Just now i checked out spark-1.2 on github , wanna to build it use maven,
 how ever I encountered an error during compiling:

 [INFO]
 
 [ERROR] Failed to execute goal
 net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first) on
 project spark-catalyst_2.10: wrap:
 scala.reflect.internal.MissingRequirementError: object scala.runtime in
 compiler mirror not found. - [Help 1]
 org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
 goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile
 (scala-compile-first) on project spark-catalyst_2.10: wrap:
 scala.reflect.internal.MissingRequirementError: object scala.runtime in
 compiler mirror not found.
 at
 org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:217)
 at
 org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
 at
 org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
 at
 org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84)
 at
 org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59)
 at
 org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183)
 at
 org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161)
 at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320)
 at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:156)
 at org.apache.maven.cli.MavenCli.execute(MavenCli.java:537)
 at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:196)
 at org.apache.maven.cli.MavenCli.main(MavenCli.java:141)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

Exception in thread main java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds] when create context

2015-04-08 Thread Shuai Zheng
Hi All,

 

In some cases, I have below exception when I run spark in local mode (I
haven't see this in a cluster). This is weird but also affect my local unit
test case (it is not always happen, but usually one per 4-5 times run). From
the stack, looks like error happen when create the context, but I don't know
why and what kind of parameters that I can set to solve this issue.

 

Exception in thread main java.util.concurrent.TimeoutException: Futures
timed

out after [1 milliseconds]

at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

 

at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223

)

at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)

at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockConte

xt.scala:53)

at scala.concurrent.Await$.result(package.scala:107)

at akka.remote.Remoting.start(Remoting.scala:180)

at
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:

184)

at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)

at
akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)

at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)

at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)

at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)

at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)

at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doC

reateActorSystem(AkkaUtils.scala:122)

at
org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)

at
org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)

at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$

sp(Utils.scala:1832)

at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)

at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1823)

at
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57

)

at org.apache.spark.SparkEnv$.create(SparkEnv.scala:223)

at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163)

at
org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267)

at org.apache.spark.SparkContext.init(SparkContext.scala:270)

at
org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.sc

ala:61)

at com.***.executor.FinancialEngineExecutor.run(F

inancialEngineExecutor.java:110)

 

Regards,

 

Shuai

 



RE: --driver-memory parameter doesn't work for spark-submmit on yarn?

2015-04-07 Thread Shuai Zheng
Sorry for reply late.

I bypass this by set _JAVA_OPTIONS.

And the ps aux | grep spark

hadoop   14442  0.6  0.2 34334552 128560 pts/0 Sl+  14:37   0:01 
/usr/java/latest/bin/java org.apache.spark.deploy.SparkSubmitDriverBootstrapper 
--driver-memory=5G --executor-memory=10G --master yarn-client --class 
com.***.FinancialEngineExecutor 
/home/hadoop/lib/Engine-2.0-jar-with-dependencies.jar 
hadoop   14544  158 13.4 37206420 8472272 pts/0 Sl+ 14:37   4:21 
/usr/java/latest/bin/java -cp 
/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/emr/*:/home/hadoop/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar::/home/hadoop/spark/conf:/home/hadoop/spark/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/home/hadoop/spark/lib/datanucleus-core-3.2.10.jar:/home/hadoop/spark/lib/datanucleus-rdbms-3.2.9.jar:/home/hadoop/spark/lib/datanucleus-api-jdo-3.2.6.jar:/home/hadoop/conf:/home/hadoop/conf
 -XX:MaxPermSize=128m -Dspark.driver.log.level=INFO -Xms512m -Xmx512m 
org.apache.spark.deploy.SparkSubmit --driver-memory=5G --executor-memory=10G 
--master yarn-client --class com.*executor.FinancialEngineExecutor 
/home/hadoop/lib/MiddlewareEngine-2.0-jar-with-dependencies.jar 

Above already have set _JAVA_OPTIONS=-Xmx30g, but looks like it doesn't show 
in the commandline. I guess SparkSubmit will read _JAVA_OPTIONS, but I just 
think this should be overwritten by the commandline params. Not sure what 
happen here, have no time to dig it out. But if you want me to provide more 
information. I will be happy to do that.

Regards,

Shuai


-Original Message-
From: Bozeman, Christopher [mailto:bozem...@amazon.com] 
Sent: Wednesday, April 01, 2015 4:59 PM
To: Shuai Zheng; 'Sean Owen'
Cc: 'Akhil Das'; user@spark.apache.org
Subject: RE: --driver-memory parameter doesn't work for spark-submmit on yarn?

Shuai,

What did  ps aux | grep spark-submit reveal?

When you compare using _JAVA_OPTIONS and without using it, where do you see the 
difference?

Thanks
Christopher




-Original Message-
From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Wednesday, April 01, 2015 11:12 AM
To: 'Sean Owen'
Cc: 'Akhil Das'; user@spark.apache.org
Subject: RE: --driver-memory parameter doesn't work for spark-submmit on yarn?

Nice.

But when my case shows that even I use Yarn-Client, I have same issue. I do 
verify it several times.

And I am running 1.3.0 on EMR (use the version dispatch by installSpark script 
from AWS).

I agree _JAVA_OPTIONS is not a right solution, but I will use it until 1.4.0 
out :)

Regards,

Shuai

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com]
Sent: Wednesday, April 01, 2015 10:51 AM
To: Shuai Zheng
Cc: Akhil Das; user@spark.apache.org
Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn?

I feel like I recognize that problem, and it's almost the inverse of
https://issues.apache.org/jira/browse/SPARK-3884 which I was looking at today. 
The spark-class script didn't seem to handle all the ways that driver memory 
can be set.

I think this is also something fixed by the new launcher library in 1.4.0.

_JAVA_OPTIONS is not a good solution since it's global.

On Wed, Apr 1, 2015 at 3:21 PM, Shuai Zheng szheng.c...@gmail.com wrote:
 Hi Akhil,



 Thanks a lot!



 After set export _JAVA_OPTIONS=-Xmx5g, the OutOfMemory exception 
 disappeared. But this make me confused, so the driver-memory options 
 doesn’t work for spark-submit to YARN (I haven’t check other clusters), is it 
 a bug?



 Regards,



 Shuai





 From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
 Sent: Wednesday, April 01, 2015 2:40 AM
 To: Shuai Zheng
 Cc: user@spark.apache.org
 Subject: Re: --driver-memory parameter doesn't work for spark-submmit 
 on yarn?



 Once you submit the job do a ps aux | grep spark-submit and see how 
 much is the heap space allocated to the process (the -Xmx params), if 
 you are seeing a lower value you could try increasing it yourself with:



 export _JAVA_OPTIONS=-Xmx5g


 Thanks

 Best Regards



 On Wed, Apr 1, 2015 at 1:57 AM, Shuai Zheng szheng.c...@gmail.com wrote:

 Hi All,



 Below is the my shell script:



 /home/hadoop/spark/bin/spark-submit --driver-memory=5G 
 --executor-memory=40G --master yarn-client --class 
 com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar 
 s3://bucket/vriscBatchConf.properties



 My driver will load some resources and then broadcast to all executors.



 That resources is only 600MB in ser format, but I always has out of 
 memory exception, it looks like the driver doesn’t allocate right 
 memory to my driver.



 Exception in thread main java.lang.OutOfMemoryError: Java heap space

 at java.lang.reflect.Array.newArray(Native Method)

 at java.lang.reflect.Array.newInstance(Array.java:70)

 at
 java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670)

 at
 java.io.ObjectInputStream.readObject0

RE: [BUG]Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer

2015-04-07 Thread Shuai Zheng
I have found the issue, but I think it is bug.

 

If I change my class to:

 

public class ModelSessionBuilder implements Serializable {

/**

* 

 */

.

private Properties[] propertiesList;

private static final long serialVersionUID =
-8139500301736028670L;

}

 

The broadcast value has no issue. But in my original form, if I broadcast it
as array of my custom subclass of Properties, after broadcast, the
propertiesList array will be an array of  empty PropertiesUtils objects
there (empty, not NULL), I am not sure why this happen (the code without any
problem when run with default java serializer). So I think this is a bug,
but I am not sure it is a bug of spark or a bug of Kryo.

 

Regards,

 

Shuai


 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Monday, April 06, 2015 5:34 PM
To: user@spark.apache.org
Subject: Broadcast value return empty after turn to
org.apache.spark.serializer.KryoSerializer

 

Hi All,

 

I have tested my code without problem on EMR yarn (spark 1.3.0) with default
serializer (java).

But when I switch to org.apache.spark.serializer.KryoSerializer, the
broadcast value doesn't give me right result (actually return me empty
custom class on inner object).

 

Basically I broadcast a builder object, which carry an array of
propertiesUtils object. The code should not have any logical issue because
it works on default java serializer. But when I turn to the
org.apache.spark.serializer.KryoSerializer, it looks like the Array doesn't
initialize, propertiesList will give a right size, but then all element in
the array is just a normal empty PropertiesUtils.

 

Do I miss anything when I use this KryoSerializer? I just put the two lines,
do I need to implement some special code to enable KryoSerializer, but I
search all places but can't find any places mention it.

 

sparkConf.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer);

sparkConf.registerKryoClasses(new Class[]{ModelSessionBuilder.class,
Constants.class, PropertiesUtils.class, ModelSession.class});

 

public class ModelSessionBuilder implements Serializable {

/**

* 

 */

.

private PropertiesUtils[] propertiesList;

private static final long serialVersionUID =
-8139500301736028670L;

}

 

public class PropertiesUtils extends Properties {

   /**

   * 

*/

   private static final long serialVersionUID = -3684043338580885551L;

 

   public PropertiesUtils(Properties prop) {

  super(prop);

   }

 

   public PropertiesUtils() {

  // TODO Auto-generated constructor stub

   }

}



 

Regards,

 

Shuai



set spark.storage.memoryFraction to 0 when no cached RDD and memory area for broadcast value?

2015-04-07 Thread Shuai Zheng
Hi All,

 

I am a bit confused on spark.storage.memoryFraction, this is used to set the
area for RDD usage, will this RDD means only for cached and persisted RDD?
So if my program has no cached RDD at all (means that I have no .cache() or
.persist() call on any RDD), then I can set this
spark.storage.memoryFraction to a very small number or even zero?

 

I am writing a program which consume a lot of memory (broadcast value,
runtime, etc). But I have no cached RDD, so should I just turn off this
spark.storage.memoryFraction to 0 (which will help me to improve the
performance)?

 

And I have another issue on the broadcast, when I try to get a broadcast
value, it throws me out of memory error, which part of memory should I
allocate more (if I can't increase my overall memory size).

 

java.lang.OutOfMemoryError: Java heap spac

e

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA

rraySerializer.read(DefaultArraySerializers.java:218)

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA

rraySerializer.read(DefaultArraySerializers.java:200)

at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea

d(FieldSerializer.java:611)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria

lizer.java:221)

at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea

d(FieldSerializer.java:605)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria

lizer.java:221)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

at
org.apache.spark.serializer.KryoDeserializationStream.readObject(Kryo

Serializer.scala:138)

at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Ser

ializer.scala:133)

at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:2

48)

at
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:13

6)

at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:5

49)

at
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:431

)

at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlo

ck$1.apply(TorrentBroadcast.scala:167)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)

at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(Torren

tBroadcast.scala:164)

at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(Torrent

Broadcast.scala:64)

at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.s

cala:64)

at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast

.scala:87)

 

 

Regards,

 

Shuai



RE: --driver-memory parameter doesn't work for spark-submmit on yarn?

2015-04-01 Thread Shuai Zheng
Hi Akhil,

 

Thanks a lot!

 

After set export _JAVA_OPTIONS=-Xmx5g, the OutOfMemory exception disappeared. 
But this make me confused, so the driver-memory options doesn’t work for 
spark-submit to YARN (I haven’t check other clusters), is it a bug?

 

Regards,

 

Shuai

 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Wednesday, April 01, 2015 2:40 AM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn?

 

Once you submit the job do a ps aux | grep spark-submit and see how much is the 
heap space allocated to the process (the -Xmx params), if you are seeing a 
lower value you could try increasing it yourself with:

 

export _JAVA_OPTIONS=-Xmx5g




Thanks

Best Regards

 

On Wed, Apr 1, 2015 at 1:57 AM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

Below is the my shell script:

 

/home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G 
--master yarn-client --class com.***.FinancialEngineExecutor 
/home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties 

 

My driver will load some resources and then broadcast to all executors.

 

That resources is only 600MB in ser format, but I always has out of memory 
exception, it looks like the driver doesn’t allocate right memory to my driver.

 

Exception in thread main java.lang.OutOfMemoryError: Java heap space

at java.lang.reflect.Array.newArray(Native Method)

at java.lang.reflect.Array.newInstance(Array.java:70)

at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)

at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

at com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68)

 

Do I do anything wrong here? 

 

And no matter how much I set for --driver-memory value (from 512M to 20G), it 
always give me error on the same line (that line try to load a 600MB java 
serialization file). So looks like the script doesn’t allocate right memory to 
driver in my case?

 

Regards,

 

Shuai

 



RE: --driver-memory parameter doesn't work for spark-submmit on yarn?

2015-04-01 Thread Shuai Zheng
Nice.

But when my case shows that even I use Yarn-Client, I have same issue. I do 
verify it several times.

And I am running 1.3.0 on EMR (use the version dispatch by installSpark script 
from AWS).

I agree _JAVA_OPTIONS is not a right solution, but I will use it until 1.4.0 
out :)

Regards,

Shuai

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Wednesday, April 01, 2015 10:51 AM
To: Shuai Zheng
Cc: Akhil Das; user@spark.apache.org
Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn?

I feel like I recognize that problem, and it's almost the inverse of
https://issues.apache.org/jira/browse/SPARK-3884 which I was looking at today. 
The spark-class script didn't seem to handle all the ways that driver memory 
can be set.

I think this is also something fixed by the new launcher library in 1.4.0.

_JAVA_OPTIONS is not a good solution since it's global.

On Wed, Apr 1, 2015 at 3:21 PM, Shuai Zheng szheng.c...@gmail.com wrote:
 Hi Akhil,



 Thanks a lot!



 After set export _JAVA_OPTIONS=-Xmx5g, the OutOfMemory exception 
 disappeared. But this make me confused, so the driver-memory options 
 doesn’t work for spark-submit to YARN (I haven’t check other clusters), is it 
 a bug?



 Regards,



 Shuai





 From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
 Sent: Wednesday, April 01, 2015 2:40 AM
 To: Shuai Zheng
 Cc: user@spark.apache.org
 Subject: Re: --driver-memory parameter doesn't work for spark-submmit 
 on yarn?



 Once you submit the job do a ps aux | grep spark-submit and see how 
 much is the heap space allocated to the process (the -Xmx params), if 
 you are seeing a lower value you could try increasing it yourself with:



 export _JAVA_OPTIONS=-Xmx5g


 Thanks

 Best Regards



 On Wed, Apr 1, 2015 at 1:57 AM, Shuai Zheng szheng.c...@gmail.com wrote:

 Hi All,



 Below is the my shell script:



 /home/hadoop/spark/bin/spark-submit --driver-memory=5G 
 --executor-memory=40G --master yarn-client --class 
 com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar 
 s3://bucket/vriscBatchConf.properties



 My driver will load some resources and then broadcast to all executors.



 That resources is only 600MB in ser format, but I always has out of 
 memory exception, it looks like the driver doesn’t allocate right 
 memory to my driver.



 Exception in thread main java.lang.OutOfMemoryError: Java heap space

 at java.lang.reflect.Array.newArray(Native Method)

 at java.lang.reflect.Array.newInstance(Array.java:70)

 at 
 java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670)

 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)

 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199
 0)

 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
 98)

 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at 
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

 at
 com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68)



 Do I do anything wrong here?



 And no matter how much I set for --driver-memory value (from 512M to 
 20G), it always give me error on the same line (that line try to load 
 a 600MB java serialization file). So looks like the script doesn’t 
 allocate right memory to driver in my case?



 Regards,



 Shuai




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--driver-memory parameter doesn't work for spark-submmit on yarn?

2015-03-31 Thread Shuai Zheng
Hi All,

 

Below is the my shell script:

 

/home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G
--master yarn-client --class com.***.FinancialEngineExecutor
/home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties 

 

My driver will load some resources and then broadcast to all executors.

 

That resources is only 600MB in ser format, but I always has out of memory
exception, it looks like the driver doesn't allocate right memory to my
driver.

 

Exception in thread main java.lang.OutOfMemoryError: Java heap space

at java.lang.reflect.Array.newArray(Native Method)

at java.lang.reflect.Array.newInstance(Array.java:70)

at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670)

at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)

at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

at
com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68)

 

Do I do anything wrong here? 

 

And no matter how much I set for --driver-memory value (from 512M to 20G),
it always give me error on the same line (that line try to load a 600MB java
serialization file). So looks like the script doesn't allocate right memory
to driver in my case?

 

Regards,

 

Shuai



When will 1.3.1 release?

2015-03-30 Thread Shuai Zheng
Hi All,

 

I am waiting the spark 1.3.1 to fix the bug to work with S3 file system. 

 

Anyone knows the release date for 1.3.1? I can't downgrade to 1.2.1 because
there is jar compatible issue to work with AWS SDK.

 

Regards,

 

Shuai



What is the jvm size when start spark-submit through local mode

2015-03-20 Thread Shuai Zheng
Hi,

 

I am curious, when I start a spark program in local mode, which parameter
will be used to decide the jvm memory size for executor?

In theory should be:

--executor-memory 20G

 

But I remember local mode will only start spark executor in the same process
of driver, then should be:

--driver-memory 20G

 

Regards,

 

Shuai



RE: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-20 Thread Shuai Zheng
Thanks!

 

Let me update the status.

 

I have copied the DirectOutputCommitter to my local. And set:

 

Conf.set(spark.hadoop.mapred.output.committer.class, 
org..DirectOutputCommitter)

 

It works perfectly.

 

Thanks  everyone J

 

Regards,

 

Shuai

 

From: Aaron Davidson [mailto:ilike...@gmail.com] 
Sent: Tuesday, March 17, 2015 3:06 PM
To: Imran Rashid
Cc: Shuai Zheng; user@spark.apache.org
Subject: Re: Spark will process _temporary folder on S3 is very slow and always 
cause failure

 

Actually, this is the more relevant JIRA (which is resolved):

https://issues.apache.org/jira/browse/SPARK-3595

 

6352 is about saveAsParquetFile, which is not in use here.

 

Here is a DirectOutputCommitter implementation:

https://gist.github.com/aarondav/c513916e72101bbe14ec

 

and it can be configured in Spark with:

sparkConf.set(spark.hadoop.mapred.output.committer.class, 
classOf[DirectOutputCommitter].getName)

 

On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid iras...@cloudera.com wrote:

I'm not super familiar w/ S3, but I think the issue is that you want to use a 
different output committers with object stores, that don't have a simple move 
operation.  There have been a few other threads on S3  outputcommitters.  I 
think the most relevant for you is most probably this open JIRA:

 

https://issues.apache.org/jira/browse/SPARK-6352

 

On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as 
a single node cluster for test. The data I use to sort is around 4GB and sit on 
S3, output will also on S3.

 

I just connect spark-shell to the local cluster and run the code in the script 
(because I just want a benchmark now).

 

My job is as simple as:

val parquetFile = 
sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,)

parquetFile.registerTempTable(Test)

val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map { row 
= { row.mkString(\t) } }

sortedResult.saveAsTextFile(s3n://myplace,);

 

The job takes around 6 mins to finish the sort when I am monitoring the 
process. After I notice the process stop at: 

 

15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at 
console:31, took 581.304992 s

 

At that time, the spark actually just write all the data to the _temporary 
folder first, after all sub-tasks finished, it will try to move all the ready 
result from _temporary folder to the final location. This process might be 
quick locally (because it will just be a cut/paste), but it looks like very 
slow on my S3, it takes a few second to move one file (usually there will be 
200 partitions). And then it raise exceptions after it move might be 40-50 
files.

 

org.apache.http.NoHttpResponseException: The target server failed to respond

at 
org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)

at 
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)

at 
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)

at 
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)

at 
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)

 



 

I try several times, but never get the full job finished. I am not sure 
anything wrong here, but I use something very basic and I can see the job has 
finished and all result on the S3 under temporary folder, but then it raise the 
exception and fail. 

 

Any special setting I should do here when deal with S3?

 

I don’t know what is the issue here, I never see MapReduce has similar issue. 
So it could not be S3’s problem.

 

Regards,

 

Shuai

 

 



RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread Shuai Zheng
Below is the output:

 

core file size  (blocks, -c) 0

data seg size   (kbytes, -d) unlimited

scheduling priority (-e) 0

file size   (blocks, -f) unlimited

pending signals (-i) 1967947

max locked memory   (kbytes, -l) 64

max memory size (kbytes, -m) unlimited

open files  (-n) 2024

pipe size(512 bytes, -p) 8

POSIX message queues (bytes, -q) 819200

real-time priority  (-r) 0

stack size  (kbytes, -s) 8192

cpu time   (seconds, -t) unlimited

max user processes  (-u) 1967947

virtual memory  (kbytes, -v) unlimited

file locks  (-x) unlimited

 

I have set the max open file to 2024 by ulimit -n 2024, but same issue

I am not sure whether it is a reasonable setting.

 

Actually I am doing a loop, each time try to sort only 3GB data, it runs
very quick in first loop, and slow down in second loop. At each time loop I
start and destroy the context (because I want to clean up the temp file
create under tmp folder, which take a lot of space). Just default setting.

 

My logic:

 

For loop:

Val sc = new sc

Sql = sc.loadParquet

Sortbykey

Sc.stop

End

 

And I run on the EC2 c3*8xlarge, Amazon Linux AMI 2014.09.2 (HVM).

 

From: java8964 [mailto:java8...@hotmail.com] 
Sent: Friday, March 20, 2015 3:54 PM
To: user@spark.apache.org
Subject: RE: com.esotericsoftware.kryo.KryoException: java.io.IOException:
File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

 

Do you think the ulimit for the user running Spark on your nodes?

 

Can you run ulimit -a under the user who is running spark on the executor
node? Does the result make sense for the data you are trying to process?

 

Yong

 

  _  

From: szheng.c...@gmail.com
To: user@spark.apache.org
Subject: com.esotericsoftware.kryo.KryoException: java.io.IOException: File
too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Date: Fri, 20 Mar 2015 15:28:26 -0400

Hi All,

 

I try to run a simple sort by on 1.2.1. And it always give me below two
errors:

 

1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID
35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException:
/tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6
b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d
d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782
6 (Too many open files)

 

And then I switch to:

conf.set(spark.shuffle.consolidateFiles, true)

.set(spark.shuffle.manager, SORT)

 

Then I get the error:

 

Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure:
Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal):
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large

at com.esotericsoftware.kryo.io.Output.flush(Output.java:157)

 

I roughly know the first issue is because Spark shuffle creates too many
local temp files (and I don't know the solution, because looks like my
solution also cause other issues), but I am not sure what means is the
second error. 

 

Anyone knows the solution for both cases?

 

Regards,

 

Shuai



com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread Shuai Zheng
Hi All,

 

I try to run a simple sort by on 1.2.1. And it always give me below two
errors:

 

1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID
35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException:
/tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6
b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d
d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782
6 (Too many open files)

 

And then I switch to:

conf.set(spark.shuffle.consolidateFiles, true)

.set(spark.shuffle.manager, SORT)

 

Then I get the error:

 

Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure:
Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal):
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large

at com.esotericsoftware.kryo.io.Output.flush(Output.java:157)

 

I roughly know the first issue is because Spark shuffle creates too many
local temp files (and I don't know the solution, because looks like my
solution also cause other issues), but I am not sure what means is the
second error. 

 

Anyone knows the solution for both cases?

 

Regards,

 

Shuai



[SPARK-3638 ] java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.

2015-03-16 Thread Shuai Zheng
Hi All,

 

I am running Spark 1.2.1 and AWS SDK. To make sure AWS compatible on the
httpclient 4.2 (which I assume spark use?), I have already downgrade to the
version 1.9.0

 

But even that, I still got an error:

 

Exception in thread main java.lang.NoSuchMethodError:
org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache
/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V

at
org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOpe
rator(PoolingClientConnectionManager.java:140)

at
org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClien
tConnectionManager.java:114)

at
org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClien
tConnectionManager.java:99)

at
com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(C
onnectionManagerFactory.java:29)

at
com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java
:102)

at
com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:190)

at
com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:119)

at
com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:410)

at
com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:392)

at
com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:376)

 

When I search the maillist, it looks the same issue as:

https://github.com/apache/spark/pull/2535

http://stackoverflow.com/questions/24788949/nosuchmethoderror-while-running-
aws-s3-client-on-spark-while-javap-shows-otherwi

 

But I don't understand the solution mention here? The issue is caused by an
pre-package  DefaultClientConnectionOperator in the spark all-in-one jar
file which doesn't have the that method.

 

I have some questions here:

 

How can we find out which exact version when spark try to pre-package
everything (this really very painful). and how can we override it?

 

I have tried:

 

val conf = new SparkConf()

  .set(spark.files.userClassPathFirst, true)// For non Yarn APP
before spark 1.3

  .set(spark.executor.userClassPathFirst, true)// For spark 1.3.0

But it doesn't work

 

This really create a lot of issues to me (especially we don't know what
version is used by Spark to package its own jar, we need to try out). Even
maven doesn't give enough information because httpclient is not under the
maven dependency (even indirect dependency, after I use tools to resolved
the whole dependency tree).

 

Regards,

 

Shuai



RE: Process time series RDD after sortByKey

2015-03-16 Thread Shuai Zheng
Hi Imran,

 

I am a bit confused here. Assume I have RDD a with 1000 partition and also has 
been sorted. How can I control when creating RDD b (with 20 partitions) to make 
sure 1-50 partition of RDD a map to 1st partition of RDD b? I don’t see any 
control code/logic here?

 

You code below:

 

val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)

 

 

Does it means I need to define/develop my own MyGroupingRDD class? I am not 
very clear how to do that, any place I can find an example? I never create my 
own RDD class before (not RDD instance J). But this is very valuable approach 
to me so I am desired to learn.

 

Regards,

 

Shuai

 

From: Imran Rashid [mailto:iras...@cloudera.com] 
Sent: Monday, March 16, 2015 11:22 AM
To: Shawn Zheng; user@spark.apache.org
Subject: Re: Process time series RDD after sortByKey

 

Hi Shuai,

 

On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng szheng.c...@gmail.com wrote:

Sorry I response late.

Zhan Zhang's solution is very interesting and I look at into it, but it is not 
what I want. Basically I want to run the job sequentially and also gain 
parallelism. So if possible, if I have 1000 partition, the best case is I can 
run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150, etc. 

If we have ability to do this, we will gain huge flexibility when we try to 
process some time series like data and a lot of algo will benefit from it.

 

yes, this is what I was suggesting you do.  You would first create one RDD (a) 
that has 1000 partitions.  Don't worry about the creation of this RDD -- it 
wont' create any tasks, its just a logical holder of your raw data.  Then you 
create another RDD (b) that depends on your RDD (a), but that only has 20 
partitions.  Each partition in (b) would depend on a number of partitions from 
(a).  As you've suggested, partition 1 in (b) would depend on partitions 1-50 
in (a), partition 2 in (b) would depend on 51-100 in (a), etc.   Note that RDD 
(b) still doesn't *do* anything.  Its just another logical holder for your 
data, but this time grouped in the way you want.  Then after RDD (b), you would 
do whatever other transformations you wanted, but now you'd be working w/ 20 
partitions:

 

val rawData1000Partitions = sc.textFile(...) // or whatever

val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)

groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc.

 

note that this is almost exactly the same as what CoalescedRdd does.  However, 
it might combine the partitions in whatever ways it feels like -- you want them 
combined in a very particular order.  So you'll need to create your own 
subclass.

 

 

Back to Zhan Zhang's 

while( iterPartition  RDD.partitions.length) {

  val res = sc.runJob(this, (it: Iterator[T]) = somFunc, iterPartition, 
allowLocal = true)

  Some other function after processing one partition.

  iterPartition += 1

}

I am curious how spark process this without parallelism, the indidivual 
partition will pass back to driver to process or just run one task on that node 
which partition exist? then follow by another partition on another node?

 

 

Not exactly.  The partition is not shipped back to the driver.  You create a 
task which will be processed by a worker.  The task scheduling will take data 
locality into account, so ideally the task will get scheduled in the same 
location where the data already resides.  The worker will execute someFunc, and 
after its done it will ship the *result* back to the driver.  Then the process 
will get repeated for all the other partitions.

 

If you wanted all the data sent back to the driver, you could use 
RDD.toLocalIterator.  That will send one partition back to the driver, let you 
process it on the driver, then fetch the next partition, etc.

 

 

Imran

 

 



RE: [SPARK-3638 ] java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.

2015-03-16 Thread Shuai Zheng
And it is an NoSuchMethodError, not a classnofound error

 

And default I think the spark is only compile against Hadoop 2.2? 

 

For this issue itself, I just check the latest spark (1.3.0), its version can 
work (because it package with a newer version of httpclient, I can see the 
method is there, although still don’t know the exact version), but this doesn’t 
really solve the whole problem, it is very unclear that what version of third 
party library is used by Spark even there is someway to figure it out, still a 
horrible decision to do that?

 

 

From: Ted Yu [mailto:yuzhih...@gmail.com] 
Sent: Monday, March 16, 2015 1:06 PM
To: Shuai Zheng
Cc: user
Subject: Re: [SPARK-3638 ] java.lang.NoSuchMethodError: 
org.apache.http.impl.conn.DefaultClientConnectionOperator.

 

From my local maven repo:

 

$ jar tvf 
~/.m2/repository/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar
 | grep SchemeRegistry

  1373 Fri Apr 19 18:19:36 PDT 2013 
org/apache/http/impl/conn/SchemeRegistryFactory.class

  2954 Fri Apr 19 18:19:36 PDT 2013 
org/apache/http/conn/scheme/SchemeRegistry.class

  2936 Fri Apr 19 18:19:36 PDT 2013 
org/apache/http/auth/AuthSchemeRegistry.class

 

If you run mvn dependency:tree, you would see something similar to the 
following:

 

[INFO] |  +- org.apache.hadoop:hadoop-client:jar:2.6.0:compile

[INFO] |  |  +- org.apache.hadoop:hadoop-common:jar:2.6.0:compile

[INFO] |  |  |  +- commons-cli:commons-cli:jar:1.2:compile

[INFO] |  |  |  +- xmlenc:xmlenc:jar:0.52:compile

[INFO] |  |  |  +- commons-io:commons-io:jar:2.4:compile

[INFO] |  |  |  +- commons-collections:commons-collections:jar:3.2.1:compile

[INFO] |  |  |  +- commons-lang:commons-lang:jar:2.6:compile

[INFO] |  |  |  +- commons-configuration:commons-configuration:jar:1.6:compile

[INFO] |  |  |  |  +- commons-digester:commons-digester:jar:1.8:compile

[INFO] |  |  |  |  |  \- commons-beanutils:commons-beanutils:jar:1.7.0:compile

[INFO] |  |  |  |  \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile

[INFO] |  |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile

[INFO] |  |  |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8:compile

[INFO] |  |  |  +- org.apache.avro:avro:jar:1.7.6:compile

[INFO] |  |  |  +- com.google.protobuf:protobuf-java:jar:2.5.0:compile

[INFO] |  |  |  +- com.google.code.gson:gson:jar:2.2.4:compile

[INFO] |  |  |  +- org.apache.hadoop:hadoop-auth:jar:2.6.0:compile

[INFO] |  |  |  |  +- org.apache.httpcomponents:httpclient:jar:4.2.5:compile

 

Cheers

 

On Mon, Mar 16, 2015 at 9:38 AM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

I am running Spark 1.2.1 and AWS SDK. To make sure AWS compatible on the 
httpclient 4.2 (which I assume spark use?), I have already downgrade to the 
version 1.9.0

 

But even that, I still got an error:

 

Exception in thread main java.lang.NoSuchMethodError: 
org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V

at 
org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)

at 
org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114)

at 
org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99)

at 
com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:29)

at 
com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:102)

at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:190)

at 
com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:119)

at 
com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:410)

at 
com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:392)

at 
com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:376)

 

When I search the maillist, it looks the same issue as:

https://github.com/apache/spark/pull/2535

http://stackoverflow.com/questions/24788949/nosuchmethoderror-while-running-aws-s3-client-on-spark-while-javap-shows-otherwi

 

But I don’t understand the solution mention here? The issue is caused by an 
pre-package  DefaultClientConnectionOperator in the spark all-in-one jar file 
which doesn’t have the that method.

 

I have some questions here:

 

How can we find out which exact version when spark try to pre-package 
everything (this really very painful). and how can we override it?

 

I have tried:

 

val conf = new SparkConf()

  .set(spark.files.userClassPathFirst, true)// For non Yarn APP before 
spark 1.3

  .set(spark.executor.userClassPathFirst, true)// For spark 1.3.0

But it doesn’t work

 

This really create a lot of issues to me (especially we don’t know what version 
is used by Spark to package

sqlContext.parquetFile doesn't work with s3n in version 1.3.0

2015-03-16 Thread Shuai Zheng
Hi All,

 

I just upgrade the system to use version 1.3.0, but then the
sqlContext.parquetFile doesn't work with s3n. I have test the same code with
1.2.1 and it works.

 

A simple test running in spark-shell:

 

val parquetFile = sqlContext.parquetFile(s3n:///test/2.parq )

java.lang.IllegalArgumentException: Wrong FS: s3n:///test/2.parq,
expected: file:/// file:///\\ 

 

And same test work with spark-shell under 1.2.1

 

Regards,

 

Shuai



RE: sqlContext.parquetFile doesn't work with s3n in version 1.3.0

2015-03-16 Thread Shuai Zheng
I see, but this is really a. big issue. anyway for me to work around? I try
to set the fs.default.name = s3n, but looks like it doesn't work. 

 

I must upgrade to 1.3.0 because I face the package incompatible issue in
1.2.1, and if I must patch something, I rather go with latest version.

 

Regards,

 

Shuai

 

From: Kelly, Jonathan [mailto:jonat...@amazon.com] 
Sent: Monday, March 16, 2015 2:54 PM
To: Shuai Zheng; user@spark.apache.org
Subject: Re: sqlContext.parquetFile doesn't work with s3n in version 1.3.0

 

See https://issues.apache.org/jira/browse/SPARK-6351

 

~ Jonathan

 

From: Shuai Zheng szheng.c...@gmail.com
Date: Monday, March 16, 2015 at 11:46 AM
To: user@spark.apache.org user@spark.apache.org
Subject: sqlContext.parquetFile doesn't work with s3n in version 1.3.0

 

Hi All,

 

I just upgrade the system to use version 1.3.0, but then the
sqlContext.parquetFile doesn't work with s3n. I have test the same code with
1.2.1 and it works.

 

A simple test running in spark-shell:

 

val parquetFile = sqlContext.parquetFile(s3n:///test/2.parq )

java.lang.IllegalArgumentException: Wrong FS: s3n:///test/2.parq,
expected: file:/// file:///\\ 

 

And same test work with spark-shell under 1.2.1

 

Regards,

 

Shuai



Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-13 Thread Shuai Zheng
Hi All,

 

I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it
as a single node cluster for test. The data I use to sort is around 4GB and
sit on S3, output will also on S3.

 

I just connect spark-shell to the local cluster and run the code in the
script (because I just want a benchmark now).

 

My job is as simple as:

val parquetFile =
sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3
n://...,s3n://...,)

parquetFile.registerTempTable(Test)

val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map {
row = { row.mkString(\t) } }

sortedResult.saveAsTextFile(s3n://myplace,);

 

The job takes around 6 mins to finish the sort when I am monitoring the
process. After I notice the process stop at: 

 

15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
console:31, took 581.304992 s

 

At that time, the spark actually just write all the data to the _temporary
folder first, after all sub-tasks finished, it will try to move all the
ready result from _temporary folder to the final location. This process
might be quick locally (because it will just be a cut/paste), but it looks
like very slow on my S3, it takes a few second to move one file (usually
there will be 200 partitions). And then it raise exceptions after it move
might be 40-50 files.

 

org.apache.http.NoHttpResponseException: The target server failed to respond

at
org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponsePar
ser.java:101)

at
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.ja
va:252)

at
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(Abst
ractHttpClientConnection.java:281)

at
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(Defa
ultClientConnection.java:247)

at
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(Ab
stractClientConnAdapter.java:219)

 



 

I try several times, but never get the full job finished. I am not sure
anything wrong here, but I use something very basic and I can see the job
has finished and all result on the S3 under temporary folder, but then it
raise the exception and fail. 

 

Any special setting I should do here when deal with S3?

 

I don't know what is the issue here, I never see MapReduce has similar
issue. So it could not be S3's problem.

 

Regards,

 

Shuai



RE: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-13 Thread Shuai Zheng
And one thing forget to mention, even I have this exception and the result
is not well format in my target folder (part of them are there, rest are
under different folder structure of _tempoary folder). In the webUI of
spark-shell, it is still be marked as successful step. I think this is a
bug?

 

Regards,

 

Shuai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Friday, March 13, 2015 6:51 PM
To: user@spark.apache.org
Subject: Spark will process _temporary folder on S3 is very slow and always
cause failure

 

Hi All,

 

I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it
as a single node cluster for test. The data I use to sort is around 4GB and
sit on S3, output will also on S3.

 

I just connect spark-shell to the local cluster and run the code in the
script (because I just want a benchmark now).

 

My job is as simple as:

val parquetFile =
sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3
n://...,s3n://...,)

parquetFile.registerTempTable(Test)

val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map {
row = { row.mkString(\t) } }

sortedResult.saveAsTextFile(s3n://myplace,);

 

The job takes around 6 mins to finish the sort when I am monitoring the
process. After I notice the process stop at: 

 

15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
console:31, took 581.304992 s

 

At that time, the spark actually just write all the data to the _temporary
folder first, after all sub-tasks finished, it will try to move all the
ready result from _temporary folder to the final location. This process
might be quick locally (because it will just be a cut/paste), but it looks
like very slow on my S3, it takes a few second to move one file (usually
there will be 200 partitions). And then it raise exceptions after it move
might be 40-50 files.

 

org.apache.http.NoHttpResponseException: The target server failed to respond

at
org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponsePar
ser.java:101)

at
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.ja
va:252)

at
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(Abst
ractHttpClientConnection.java:281)

at
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(Defa
ultClientConnection.java:247)

at
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(Ab
stractClientConnAdapter.java:219)

 



 

I try several times, but never get the full job finished. I am not sure
anything wrong here, but I use something very basic and I can see the job
has finished and all result on the S3 under temporary folder, but then it
raise the exception and fail. 

 

Any special setting I should do here when deal with S3?

 

I don't know what is the issue here, I never see MapReduce has similar
issue. So it could not be S3's problem.

 

Regards,

 

Shuai



jar conflict with Spark default packaging

2015-03-13 Thread Shuai Zheng
Hi All,

 

I am running spark to deal with AWS.

 

And aws sdk latest version is working with httpclient 3.4+. Then but
spark-assembly-*-.jar file has packaged an old httpclient version which
cause me: ClassNotFoundException for
org/apache/http/client/methods/HttpPatch

 

Even when I put the right httpclient jar there, it won't help because spark
always take the class from same packaging first. 

 

I don't know why spark only provide a big package which doesn't allow us to
customize the library loading sequence. I know I can just rebuild the spark,
but this is very troublesome, and it should not be a general solution for
long term (I can't rebuild spark jar every time when have a jar conflict as
spark is supposed to be a cluster).

 

In hadoop, we have mapreduce.job.user.classpath.first=true. But
spark.yarn.user.classpath.first only work for Yarn.

 

I think I am not the one who face this issue. Anyone has a more general
solution for this?

 

Regards,

 

Shuai

 

 



How to pass parameter to spark-shell when choose client mode --master yarn-client

2015-03-10 Thread Shuai Zheng
Hi All,

 

I try to pass parameter to the spark-shell when I do some test:

 

spark-shell --driver-memory 512M --executor-memory 4G --master
spark://:7077 --conf spark.sql.parquet.compression.codec=snappy --conf
spark.sql.parquet.binaryAsString=true

 

This works fine on my local pc. And when I start EMR, and pass the similar
things:

 

~/spark/bin/spark-shell --executor-memory 40G --master yarn-client
--num-executors 1 --executor-cores 32 --conf
spark.sql.parquet.compression.codec=snappy --conf
spark.sql.parquet.binaryAsString=true --conf
spark.serializer=org.apache.spark.serializer.KryoSerializer

 

The parameter doesn't pass to spark-shell. Anyone knows why? And what is the
alternatives for me to do that?

 

Regards,

 

Shuai



Read Parquet file from scala directly

2015-03-09 Thread Shuai Zheng
Hi All,

 

I have a lot of parquet files, and I try to open them directly instead of
load them into RDD in driver (so I can optimize some performance through
special logic). 

But I do some research online and can't find any example to access parquet
directly from scala, anyone has done this before?

 

Regards,

 

Shuai



How to preserve/preset partition information when load time series data?

2015-03-09 Thread Shuai Zheng
Hi All,

 

If I have a set of time series data files, they are in parquet format and
the data for each day are store in naming convention, but I will not know
how many files for one day.

 

20150101a.parq

20150101b.parq

20150102a.parq

20150102b.parq

20150102c.parq

.

201501010a.parq

.

 

Now I try to write a program to process the data. And I want to make sure
each day's data into one partition, of course I can load all into one big
RDD to do partition but it will be very slow. As I already know the time
series of the file name, is it possible for me to load the data into the RDD
also preserve the partition? I know I can preserve the partition by each
file, but is it anyway for me to load the RDD and preserve partition based
on a set of files: one partition multiple files?

 

I think it is possible because when I load a RDD from 100 files (assume
cross 100 days), I will have 100 partitions (if I disable file split when
load file). Then I can use a special coalesce to repartition the RDD? But I
don't know is it possible to do that in current Spark now?

 

Regards,

 

Shuai 



Process time series RDD after sortByKey

2015-03-09 Thread Shuai Zheng
Hi All,

 

I am processing some time series data. For one day, it might has 500GB, then
for each hour, it is around 20GB data.

 

I need to sort the data before I start process. Assume I can sort them
successfully

 

dayRDD.sortByKey

 

but after that, I might have thousands of partitions (to make the sort
successfully), might be 1000 partitions. And then I try to process the data
by hour (not need exactly one hour, but some kind of similar time frame).
And I can't just re-partition size to 24 because then one partition might be
too big to fit into memory (if it is 20GB). So is there any way for me to
just can process underlying partitions by certain order? Basically I want to
call mapPartitionsWithIndex with a range of index?

 

Anyway to do it? Hope I describe my issue clear. J

 

Regards,

 

Shuai

 

 



RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-24 Thread Shuai Zheng
Hi Imran,

 

I will say your explanation is extremely helpful J

 

I tested some ideas according to your explanation and it make perfect sense to 
me. I modify my code to use cogroup+mapValues instead of union+reduceByKey to 
preserve the partition, which gives me more than 100% performance gain (for the 
loop part). 

 

Thanks a lot!

 

And I am curious will there any easy way for me to get a detail DAG execution 
plan description without running the code? Just as explain command in pig or 
sql?

 

Shuai

 

From: Imran Rashid [mailto:iras...@cloudera.com] 
Sent: Monday, February 23, 2015 6:00 PM
To: Shuai Zheng
Cc: Shao, Saisai; user@spark.apache.org
Subject: Re: Union and reduceByKey will trigger shuffle even same partition?

 

I think you're getting tripped up lazy evaluation and the way stage boundaries 
work (admittedly its pretty confusing in this case).

 

It is true that up until recently, if you unioned two RDDs with the same 
partitioner, the result did not have the same partitioner.  But that was just 
fixed here:

https://github.com/apache/spark/pull/4629

 

That does mean that after you update ranks, it will no longer have a 
partitioner, which will effect the join on your second iteration here:

 val contributions = links.join(ranks).flatMap

 

But, I think most of the shuffles you are pointing to are a different issue.  I 
may be belaboring something you already know, but I think this is easily 
confusing.  I think

 the first thing is understanding where you get stage boundaries, and how they 
are named.  Each shuffle introduces a stage boundary.  However, the stages get 
named by

the last thing in a stage, which is not really what is always causing the 
shuffle.  Eg., reduceByKey() causes a shuffle, but we don't see that in a stage 
name.  Similarly, map()

does not cause a shuffle, but we see a stage with that name.  

 

So, what do the stage boundaries we see actually correspond to?

 

1) map -- that is doing the shuffle write for the following groupByKey

2) groupByKey -- in addition to reading the shuffle output from your map, this 
is *also* doing the shuffle write for the next shuffle you introduce w/ 
partitionBy

3) union -- this is doing the shuffle reading from your partitionBy, and then 
all the work from there right up until the shuffle write for what is 
immediatley after union -- your

 reduceByKey.

4) lookup is an action, which is why that has another stage.

 

a couple of things to note:
(a) your join does not cause a shuffle, b/c both rdds share a partitioner

(b) you have two shuffles from groupByKey followed by partitionBy -- you really 
probably want the 1 arg form of groupByKey(partitioner)

 

 

hopefully this is helpful to understand how your stages  shuffles correspond 
to your code.

 

Imran

 

 

 

On Mon, Feb 23, 2015 at 3:35 PM, Shuai Zheng szheng.c...@gmail.com wrote:

This also trigger an interesting question:  how can I do this locally by code 
if I want. For example: I have RDD A and B, which has some partition, then if I 
want to join A to B, I might just want to do a mapper side join (although B 
itself might be big, but B’s local partition is known small enough put in 
memory), how can I access other RDD’s local partition in the mapParitition 
method? Is it anyway to do this in Spark?

 

From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

 

If you call reduceByKey(), internally Spark will introduce a shuffle 
operations, not matter the data is already partitioned locally, Spark itself do 
not know the data is already well partitioned.

 

So if you want to avoid Shuffle, you have  to write the code explicitly to 
avoid this, from my understanding. You can call mapParitition to get a 
partition of data and reduce by key locally by your logic.

 

Thanks

Saisai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

 

Hi All,

 

I am running a simple page rank program, but it is slow. And I dig out part of 
reason is there is shuffle happen when I call an union action even both RDD 
share the same partition:

 

Below is my test code in spark shell:

 

import org.apache.spark.HashPartitioner

 

sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)

val beta = 0.8

val numOfPartition = 6

  val links = 
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val
 part=line.split(\t); 
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new 
HashPartitioner(numOfPartition)).persist

  var ranks = links.mapValues(_ = 1.0)

  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

 

  for (i - 1 until 2) {

val contributions = links.join(ranks).flatMap {

  case (pageId, (links, rank

RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shuai Zheng
This also trigger an interesting question:  how can I do this locally by
code if I want. For example: I have RDD A and B, which has some partition,
then if I want to join A to B, I might just want to do a mapper side join
(although B itself might be big, but B's local partition is known small
enough put in memory), how can I access other RDD's local partition in the
mapParitition method? Is it anyway to do this in Spark?

 

From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

 

If you call reduceByKey(), internally Spark will introduce a shuffle
operations, not matter the data is already partitioned locally, Spark itself
do not know the data is already well partitioned.

 

So if you want to avoid Shuffle, you have  to write the code explicitly to
avoid this, from my understanding. You can call mapParitition to get a
partition of data and reduce by key locally by your logic.

 

Thanks

Saisai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

 

Hi All,

 

I am running a simple page rank program, but it is slow. And I dig out part
of reason is there is shuffle happen when I call an union action even both
RDD share the same partition:

 

Below is my test code in spark shell:

 

import org.apache.spark.HashPartitioner

 

sc.getConf.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)

val beta = 0.8

val numOfPartition = 6

  val links =
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line=
{val part=line.split(\t);
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
HashPartitioner(numOfPartition)).persist

  var ranks = links.mapValues(_ = 1.0)

  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

 

  for (i - 1 until 2) {

val contributions = links.join(ranks).flatMap {

  case (pageId, (links, rank)) =

links.map(dest = (dest, rank / links.size * beta))

}

ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)

  }  

  ranks.lookup(1)

 

In above code, links will join ranks and should preserve the partition, and
leakedMatrix also share the same partition, so I expect there is no shuffle
happen on the contributions.union(leakedMatrix), also on the coming
reduceByKey after that. But finally there is shuffle write for all steps,
map, groupByKey, Union, partitionBy, etc.

 

I expect there should only happen once on the shuffle then all should local
operation, but the screen shows not, do I have any misunderstanding here?

 





Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shuai Zheng
Hi All,

 

I am running a simple page rank program, but it is slow. And I dig out part
of reason is there is shuffle happen when I call an union action even both
RDD share the same partition:

 

Below is my test code in spark shell:

 

import org.apache.spark.HashPartitioner

 

sc.getConf.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)

val beta = 0.8

val numOfPartition = 6

  val links =
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line=
{val part=line.split(\t);
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
HashPartitioner(numOfPartition)).persist

  var ranks = links.mapValues(_ = 1.0)

  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

 

  for (i - 1 until 2) {

val contributions = links.join(ranks).flatMap {

  case (pageId, (links, rank)) =

links.map(dest = (dest, rank / links.size * beta))

}

ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)

  }  

  ranks.lookup(1)

 

In above code, links will join ranks and should preserve the partition, and
leakedMatrix also share the same partition, so I expect there is no shuffle
happen on the contributions.union(leakedMatrix), also on the coming
reduceByKey after that. But finally there is shuffle write for all steps,
map, groupByKey, Union, partitionBy, etc.

 

I expect there should only happen once on the shuffle then all should local
operation, but the screen shows not, do I have any misunderstanding here?

 





RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shuai Zheng
In the book of learning spark:

 



 

So here it means only no shuffle happen crossing network but still will do
shuffle locally? Even it is the case, why union will trigger shuffle? I
think union will only just append the RDD together.

 

From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

 

If you call reduceByKey(), internally Spark will introduce a shuffle
operations, not matter the data is already partitioned locally, Spark itself
do not know the data is already well partitioned.

 

So if you want to avoid Shuffle, you have  to write the code explicitly to
avoid this, from my understanding. You can call mapParitition to get a
partition of data and reduce by key locally by your logic.

 

Thanks

Saisai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

 

Hi All,

 

I am running a simple page rank program, but it is slow. And I dig out part
of reason is there is shuffle happen when I call an union action even both
RDD share the same partition:

 

Below is my test code in spark shell:

 

import org.apache.spark.HashPartitioner

 

sc.getConf.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)

val beta = 0.8

val numOfPartition = 6

  val links =
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line=
{val part=line.split(\t);
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
HashPartitioner(numOfPartition)).persist

  var ranks = links.mapValues(_ = 1.0)

  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

 

  for (i - 1 until 2) {

val contributions = links.join(ranks).flatMap {

  case (pageId, (links, rank)) =

links.map(dest = (dest, rank / links.size * beta))

}

ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)

  }  

  ranks.lookup(1)

 

In above code, links will join ranks and should preserve the partition, and
leakedMatrix also share the same partition, so I expect there is no shuffle
happen on the contributions.union(leakedMatrix), also on the coming
reduceByKey after that. But finally there is shuffle write for all steps,
map, groupByKey, Union, partitionBy, etc.

 

I expect there should only happen once on the shuffle then all should local
operation, but the screen shows not, do I have any misunderstanding here?

 





RE: How to design a long live spark application

2015-02-06 Thread Shuai Zheng
Thanks. I think about it, yes, the DAG engine should not have issue to build 
the right graph in different threads (at least in theory, it is not an issue).

 

So now I have another question: if I have a context initiated, but there is no 
operation on it for very long time, will there a timeout on it? How Spark to 
control/maintain/detect the live of the client spark context?

Do I need to setup something special?

 

Regards,

 

Shuai

 

From: Eugen Cepoi [mailto:cepoi.eu...@gmail.com] 
Sent: Thursday, February 05, 2015 5:39 PM
To: Shuai Zheng
Cc: Corey Nolet; Charles Feduke; user@spark.apache.org
Subject: Re: How to design a long live spark application

 

Yes you can submit multiple actions from different threads to the same 
SparkContext. It is safe.

Indeed what you want to achieve is quite common. Expose some operations over a 
SparkContext through HTTP.

I have used spray for this and it just worked fine.

At bootstrap of your web app, start a sparkcontext, maybe preprocess some data 
and cache it, then start accepting requests against this sc. Depending where 
you place the initialization code, you can block the server from initializing 
until your context is ready. This is nice if you don't want to accept requests 
while the context is being prepared.

 

 

Eugen

 

 

2015-02-05 23:22 GMT+01:00 Shuai Zheng szheng.c...@gmail.com:

This example helps a lot J

 

But I am thinking a below case:

 

Assume I have a SparkContext as a global variable. 

Then if I use multiple threads to access/use it. Will it mess up?

 

For example:

 

My code:

 

public static ListTuple2Integer, Double run(JavaSparkContext sparkContext, 
MapInteger, ListExposureInfo cache, Properties prop, ListEghInfo el)

 throws IOException, InterruptedException {

JavaRDDEghInfo lines = sparkContext.parallelize(el, 100);

Lines.map(…)

…

Lines.count()

}

 

If I have two threads call this method at the same time and pass in the same 
SparkContext.

 

Will SparkContext be thread-safe? I am a bit worry here, in traditional java, 
it should be, but in Spark context, I am not 100% sure. 

 

Basically the sparkContext need to smart enough to differentiate the different 
method context (RDD add to it from different methods), so create two different 
DAG for different method. 

 

Anyone can confirm this? This is not something I can easily test with code. 
Thanks!

 

Regards,

 

Shuai

 

From: Corey Nolet [mailto:cjno...@gmail.com] 
Sent: Thursday, February 05, 2015 11:55 AM
To: Charles Feduke
Cc: Shuai Zheng; user@spark.apache.org
Subject: Re: How to design a long live spark application

 

Here's another lightweight example of running a SparkContext in a common java 
servlet container: https://github.com/calrissian/spark-jetty-server

 

On Thu, Feb 5, 2015 at 11:46 AM, Charles Feduke charles.fed...@gmail.com 
wrote:

If you want to design something like Spark shell have a look at:

 

http://zeppelin-project.org/

 

Its open source and may already do what you need. If not, its source code will 
be helpful in answering the questions about how to integrate with long running 
jobs that you have.

 

On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com wrote:

You can check out https://github.com/spark-jobserver/spark-jobserver - this 
allows several users to upload their jars and run jobs with a REST interface.

 

However, if all users are using the same functionality, you can write a simple 
spray server which will act as the driver and hosts the spark context+RDDs, 
launched in client mode.

 

On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

I want to develop a server side application:

 

User submit request à Server run spark application and return (this might take 
a few seconds).

 

So I want to host the server to keep the long-live context, I don’t know 
whether this is reasonable or not.

 

Basically I try to have a global JavaSparkContext instance and keep it there, 
and initialize some RDD. Then my java application will use it to submit the job.

 

So now I have some questions:

 

1, if I don’t close it, will there any timeout I need to configure on the spark 
server?

2, In theory I want to design something similar to Spark shell (which also host 
a default sc there), just it is not shell based. 

 

Any suggestion? I think my request is very common for application development, 
here must someone has done it before?

 

Regards,

 

Shawn

 

 

 



How to design a long live spark application

2015-02-05 Thread Shuai Zheng
Hi All,

 

I want to develop a server side application:

 

User submit request à Server run spark application and return (this might
take a few seconds).

 

So I want to host the server to keep the long-live context, I don’t know
whether this is reasonable or not.

 

Basically I try to have a global JavaSparkContext instance and keep it
there, and initialize some RDD. Then my java application will use it to
submit the job.

 

So now I have some questions:

 

1, if I don’t close it, will there any timeout I need to configure on the
spark server?

2, In theory I want to design something similar to Spark shell (which also
host a default sc there), just it is not shell based. 

 

Any suggestion? I think my request is very common for application
development, here must someone has done it before?

 

Regards,

 

Shawn



Use Spark as multi-threading library and deprecate web UI

2015-02-05 Thread Shuai Zheng
Hi All,

 

It might sounds weird, but I think spark is perfect to be used as a
multi-threading library in some cases. The local mode will naturally boost
multiple thread when required. Because it is more restrict and less chance
to have potential bug in the code (because it is more data oriental, not
thread oriental). Of course, it cannot be used for all cases, but in most of
my applications, it is enough (90%). 

 

I want to hear other people's idea about this.

 

BTW: if I run spark in local mode, how to deprecate the web UI (default
listen on 4040), because I don't want to start the UI every time if I use
spark as a local library.

 

Regards,

 

Shuai



RE: How to design a long live spark application

2015-02-05 Thread Shuai Zheng
This example helps a lot J

 

But I am thinking a below case:

 

Assume I have a SparkContext as a global variable. 

Then if I use multiple threads to access/use it. Will it mess up?

 

For example:

 

My code:

 

public static ListTuple2Integer, Double run(JavaSparkContext sparkContext, 
MapInteger, ListExposureInfo cache, Properties prop, ListEghInfo el)

 throws IOException, InterruptedException {

JavaRDDEghInfo lines = sparkContext.parallelize(el, 100);

Lines.map(…)

…

Lines.count()

}

 

If I have two threads call this method at the same time and pass in the same 
SparkContext.

 

Will SparkContext be thread-safe? I am a bit worry here, in traditional java, 
it should be, but in Spark context, I am not 100% sure. 

 

Basically the sparkContext need to smart enough to differentiate the different 
method context (RDD add to it from different methods), so create two different 
DAG for different method. 

 

Anyone can confirm this? This is not something I can easily test with code. 
Thanks!

 

Regards,

 

Shuai

 

From: Corey Nolet [mailto:cjno...@gmail.com] 
Sent: Thursday, February 05, 2015 11:55 AM
To: Charles Feduke
Cc: Shuai Zheng; user@spark.apache.org
Subject: Re: How to design a long live spark application

 

Here's another lightweight example of running a SparkContext in a common java 
servlet container: https://github.com/calrissian/spark-jetty-server

 

On Thu, Feb 5, 2015 at 11:46 AM, Charles Feduke charles.fed...@gmail.com 
wrote:

If you want to design something like Spark shell have a look at:

 

http://zeppelin-project.org/

 

Its open source and may already do what you need. If not, its source code will 
be helpful in answering the questions about how to integrate with long running 
jobs that you have.

 

On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com wrote:

You can check out https://github.com/spark-jobserver/spark-jobserver - this 
allows several users to upload their jars and run jobs with a REST interface.

 

However, if all users are using the same functionality, you can write a simple 
spray server which will act as the driver and hosts the spark context+RDDs, 
launched in client mode.

 

On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

I want to develop a server side application:

 

User submit request à Server run spark application and return (this might take 
a few seconds).

 

So I want to host the server to keep the long-live context, I don’t know 
whether this is reasonable or not.

 

Basically I try to have a global JavaSparkContext instance and keep it there, 
and initialize some RDD. Then my java application will use it to submit the job.

 

So now I have some questions:

 

1, if I don’t close it, will there any timeout I need to configure on the spark 
server?

2, In theory I want to design something similar to Spark shell (which also host 
a default sc there), just it is not shell based. 

 

Any suggestion? I think my request is very common for application development, 
here must someone has done it before?

 

Regards,

 

Shawn

 

 



RE: Use Spark as multi-threading library and deprecate web UI

2015-02-05 Thread Shuai Zheng
Nice. I just try and it works. Thanks very much!

And I notice there is below in the log:

15/02/05 11:19:09 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriver@NY02913D.global.local:8162]
15/02/05 11:19:10 INFO AkkaUtils: Connecting to HeartbeatReceiver: 
akka.tcp://sparkDriver@NY02913D.global.local:8162/user/HeartbeatReceiver

As I understand. The local mode will have driver and executors in the same java 
process. So is there any way for me to also disable above two listeners? Or 
they are not optional even in local mode?

Regards,

Shuai 



-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Thursday, February 05, 2015 10:53 AM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: Re: Use Spark as multi-threading library and deprecate web UI

Do you mean disable the web UI? spark.ui.enabled=false

Sure, it's useful with master = local[*] too.

On Thu, Feb 5, 2015 at 9:30 AM, Shuai Zheng szheng.c...@gmail.com wrote:
 Hi All,



 It might sounds weird, but I think spark is perfect to be used as a 
 multi-threading library in some cases. The local mode will naturally 
 boost multiple thread when required. Because it is more restrict and 
 less chance to have potential bug in the code (because it is more data 
 oriental, not thread oriental). Of course, it cannot be used for all 
 cases, but in most of my applications, it is enough (90%).



 I want to hear other people’s idea about this.



 BTW: if I run spark in local mode, how to deprecate the web UI 
 (default listen on 4040), because I don’t want to start the UI every 
 time if I use spark as a local library.



 Regards,



 Shuai


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Executor vs Mapper in Hadoop

2015-01-16 Thread Shuai Zheng
Thanks a lot for clarify it.
Then following there are some questions:

1, if normally we have 1 executor per machine. Then if we have a cluster
with different hardware capacity, for example: one 8 core worker and one 4
core worker (ignore the driver machine), then if we set executor-cores =4,
then for 8 core worker there will have 2 executors running, am I right? if
we set the executor-cores=8, then one worker (with 4 core) can't start any
executors. And similar idea will also apply to the memory allocation, so
 we should always use same hardware configuration for spark cluster as
worker machine?

2, If I run spark on Yarn (actually EMR), where can I check/configure the
default executor-cores?

Regards,

Shuai



On Thu, Jan 15, 2015 at 11:13 PM, Sean Owen so...@cloudera.com wrote:

 An executor is specific to a Spark application, just as a mapper is
 specific to a MapReduce job. So a machine will usually be running many
 executors, and each is a JVM.

 A Mapper is single-threaded; an executor can run many tasks (possibly
 from different jobs within the application) at once. Yes, 5 executors
 with 4 cores should be able to process 20 tasks in parallel.

 In any normal case, you have 1 executor per machine per application.
 There are cases where you would make more than 1, but these are
 unusual.

 On Thu, Jan 15, 2015 at 8:16 PM, Shuai Zheng szheng.c...@gmail.com
 wrote:
  Hi All,
 
 
 
  I try to clarify some behavior in the spark for executor. Because I am
 from
  Hadoop background, so I try to compare it to the Mapper (or reducer) in
  hadoop.
 
 
 
  1, Each node can have multiple executors, each run in its own process?
 This
  is same as mapper process.
 
 
 
  2, I thought the spark executor will use multi-thread mode when there are
  more than 1 core to allocate to it (for example: set executor-cores to
 5).
  In this way, how many partition it can process? For example, if input
 are 20
  partitions (similar as 20 split as mapper input) and we have 5 executors,
  each has 4 cores. Will all these partitions will be proceed as the same
 time
  (so each core process one partition) or actually one executor can only
 run
  one partition at the same time?
 
 
 
  I don’t know whether my understand is correct, please suggest.
 
 
 
  BTW: In general practice, should we always try to set the executor-cores
 to
  a higher number? So we will favor 10 cores * 2 executor than 2 cores*10
  executors? Any suggestion here?
 
 
 
  Thanks!
 
 
 
  Regards,
 
 
 
  Shuai



RE: Determine number of running executors

2015-01-16 Thread Shuai Zheng
Hi Tobias,

 

Can you share more information about how do you do that? I also have similar 
question about this.

 

Thanks a lot,

 

Regards,

 

Shuai

 

From: Tobias Pfeiffer [mailto:t...@preferred.jp] 
Sent: Wednesday, November 26, 2014 12:25 AM
To: Sandy Ryza
Cc: Yanbo Liang; user
Subject: Re: Determine number of running executors

 

Hi,

 

Thanks for your help!

 

Sandy, I had a bit of trouble finding the spark.executor.cores property. (It 
wasn't there although its value should have been 2.)

I ended up throwing regular expressions on 
scala.util.Properties.propOrElse(sun.java.command, ), which worked 
surprisingly well ;-)

 

Thanks

Tobias

 



Executor parameter doesn't work for Spark-shell on EMR Yarn

2015-01-15 Thread Shuai Zheng
Hi All,

 

I am testing Spark on EMR cluster. Env is a one node cluster r3.8xlarge. Has
32 vCore and 244G memory.

 

But the command line I use to start up spark-shell, it can't work. For
example:

 

~/spark/bin/spark-shell --jars
/home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/*.jar --num-executors 6
--executor-memory 10G

 

Neither num-executors nor memory setup works.

 

And more interesting, if I use test code:

val lines = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75,
-240990|161324,9051480,0,2,30.48,75))

var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum

 

It will start 32 executors (then I assume it try to start all executors for
every vCore).

 

But if I use some real data to do it (the file size is 200M):

val lines = sc.textFile(s3://.../part-r-0) 

var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum

It will only start 4 executors, which map to the number of HDFS split (200M
will have 4 splits).

 

So I have two questions:

1, Why the setup parameter is ignored by Yarn? How can I limit the number of
executors I can run? 

2, Why my much smaller test data set will trigger 32 executors but my real
200M data set will only have 4 executors?

 

So how should I control the executor setup on the spark-shell? And I print
the sparkConf, it looks like much less than I expect, and I don't see my
pass in parameter show there.

 

scala sc.getConf.getAll.foreach(println)

(spark.tachyonStore.folderName,spark-af0c4d42-fe4d-40b0-a3cf-25b6a9e16fa0)

(spark.app.id,local-1421353031552)

(spark.eventLog.enabled,true)

(spark.executor.id,driver)

(spark.repl.class.uri,http://10.181.82.38:58415)

(spark.driver.host,ip-10-181-82-38.ec2.internal)

(spark.executor.extraJavaOptions,-verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70)

(spark.app.name,Spark shell)

(spark.fileserver.uri,http://10.181.82.38:54666)

(spark.jars,file:/home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/aws-java-sdk
-1.9.14.jar)

(spark.eventLog.dir,hdfs:///spark-logs)

(spark.executor.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hado
op/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hado
op/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar)

(spark.master,local[*])

(spark.driver.port,54191)

(spark.driver.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hadoop
/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop
/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar)

 

I search the old threads, attached email answer the question about why vCore
setup doesn't work. But I think this is not same issue as me. Otherwise then
default Yarn Spark setup can't do any adjustment? 

 

Regards,

 

Shuai

 

 

 

 

---BeginMessage---
If you are using capacity scheduler in yarn: By default yarn capacity
scheduler uses DefaultResourceCalculator. DefaultResourceCalculator
consider¹s only memory while allocating contains.
You can use DominantResourceCalculator, it considers memory and cpu.
In capacity-scheduler.xml set
yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.res
ource.DefaultResourceCalculator


On 04/11/14 3:03 am, Gen gen.tan...@gmail.com wrote:

Hi,

Well, I doesn't find original documentation, but according to
http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage
http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage
,
the vcores is not for physics cpu core but for virtual cores.
And I used top command to monitor the cpu utilization during the spark
task.
The spark can use all cpu even I leave --executor-cores as default(1).

Hope that it can be a help.
Cheers
Gen


Gen wrote
 Hi,
 
 Maybe it is a stupid question, but I am running spark on yarn. I request
 the resources by the following command:
 {code}
 ./spark-submit --master yarn-client --num-executors #number of worker
 --executor-cores #number of cores. ...
 {code}
 However, after launching the task, I use
/
 yarn node -status ID
/
  to monitor the situation of cluster. It shows that the number of Vcores
 used for each container is always 1 no matter what number I pass by
 --executor-cores.
 Any ideas how to solve this problem? Thanks a lot in advance for your
 help.
 
 Cheers
 Gen





--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/executor-cores-cannot-
change-vcores-in-yarn-tp17883p17992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


---End Message---


Executor vs Mapper in Hadoop

2015-01-15 Thread Shuai Zheng
Hi All,

 

I try to clarify some behavior in the spark for executor. Because I am from
Hadoop background, so I try to compare it to the Mapper (or reducer) in
hadoop.

 

1, Each node can have multiple executors, each run in its own process? This
is same as mapper process. 

 

2, I thought the spark executor will use multi-thread mode when there are
more than 1 core to allocate to it (for example: set executor-cores to 5).
In this way, how many partition it can process? For example, if input are 20
partitions (similar as 20 split as mapper input) and we have 5 executors,
each has 4 cores. Will all these partitions will be proceed as the same time
(so each core process one partition) or actually one executor can only run
one partition at the same time?

 

I don't know whether my understand is correct, please suggest.

 

BTW: In general practice, should we always try to set the executor-cores to
a higher number? So we will favor 10 cores * 2 executor than 2 cores*10
executors? Any suggestion here? 

 

Thanks!

 

Regards,

 

Shuai



RE: Executor parameter doesn't work for Spark-shell on EMR Yarn

2015-01-15 Thread Shuai Zheng
I figure out the second question, because if I don't pass in the num of
partition for the test data, it will by default assume has max executors
(although I don't know what is this default max num).

 

val lines = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75,
-240990|161324,9051480,0,2,30.48,75),2)

will only trigger 2 executors.

 

So I think the default executors num will be decided by the first RDD
operation need to send to executors. This give me a weird way to control the
num of executors (a fake/test code piece run to kick off the executors
first, then run the real behavior - because executor will run the whole
lifecycle of the applications? Although this may not have any real value in
practice J

 

But I still need help for my first question. 

 

Thanks a lot.

 

Regards,

 

Shuai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Thursday, January 15, 2015 4:03 PM
To: user@spark.apache.org
Subject: RE: Executor parameter doesn't work for Spark-shell on EMR Yarn

 

Forget to mention, I use EMR AMI 3.3.1, Spark 1.2.0. Yarn 2.4. The spark is
setup by the standard script:
s3://support.elasticmapreduce/spark/install-spark

 

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Thursday, January 15, 2015 3:52 PM
To: user@spark.apache.org
Subject: Executor parameter doesn't work for Spark-shell on EMR Yarn

 

Hi All,

 

I am testing Spark on EMR cluster. Env is a one node cluster r3.8xlarge. Has
32 vCore and 244G memory.

 

But the command line I use to start up spark-shell, it can't work. For
example:

 

~/spark/bin/spark-shell --jars
/home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/*.jar --num-executors 6
--executor-memory 10G

 

Neither num-executors nor memory setup works.

 

And more interesting, if I use test code:

val lines = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75,
-240990|161324,9051480,0,2,30.48,75))

var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum

 

It will start 32 executors (then I assume it try to start all executors for
every vCore).

 

But if I use some real data to do it (the file size is 200M):

val lines = sc.textFile(s3://.../part-r-0) 

var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum

It will only start 4 executors, which map to the number of HDFS split (200M
will have 4 splits).

 

So I have two questions:

1, Why the setup parameter is ignored by Yarn? How can I limit the number of
executors I can run? 

2, Why my much smaller test data set will trigger 32 executors but my real
200M data set will only have 4 executors?

 

So how should I control the executor setup on the spark-shell? And I print
the sparkConf, it looks like much less than I expect, and I don't see my
pass in parameter show there.

 

scala sc.getConf.getAll.foreach(println)

(spark.tachyonStore.folderName,spark-af0c4d42-fe4d-40b0-a3cf-25b6a9e16fa0)

(spark.app.id,local-1421353031552)

(spark.eventLog.enabled,true)

(spark.executor.id,driver)

(spark.repl.class.uri,http://10.181.82.38:58415)

(spark.driver.host,ip-10-181-82-38.ec2.internal)

(spark.executor.extraJavaOptions,-verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70)

(spark.app.name,Spark shell)

(spark.fileserver.uri,http://10.181.82.38:54666)

(spark.jars,file:/home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/aws-java-sdk
-1.9.14.jar)

(spark.eventLog.dir,hdfs:///spark-logs)

(spark.executor.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hado
op/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hado
op/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar)

(spark.master,local[*])

(spark.driver.port,54191)

(spark.driver.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hadoop
/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop
/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar)

 

I search the old threads, attached email answer the question about why vCore
setup doesn't work. But I think this is not same issue as me. Otherwise then
default Yarn Spark setup can't do any adjustment? 

 

Regards,

 

Shuai

 

 

 

 



RE: Why always spilling to disk and how to improve it?

2015-01-14 Thread Shuai Zheng
Thanks a lot!

 

I just realize the spark is not a really in-memory version of mapreduce J

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Tuesday, January 13, 2015 3:53 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: Re: Why always spilling to disk and how to improve it?

 

You could try setting the following to tweak the application a little bit:

 

  .set(spark.rdd.compress,true)

  .set(spark.storage.memoryFraction, 1)

  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)

 

For shuffle behavior, you can look at this document 
https://spark.apache.org/docs/1.1.0/configuration.html#shuffle-behavior




Thanks

Best Regards

 

On Wed, Jan 14, 2015 at 1:51 AM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

I am trying with some small data set. It is only 200m, and what I am doing is 
just do a distinct count on it.

But there are a lot of spilling happen in the log (I attached in the end of the 
email).

 

Basically I use 10G memory, run on a one-node EMR cluster with r3*8xlarge 
instance type (which has 244G memory and 32 vCPU).

 

My code is simple, run in the spark-shell (~/spark/bin/spark-shell 
--executor-cores 4 --executor-memory 10G)

 

val llg = sc.textFile(s3://…/part-r-0) // File is around 210.5M, 4.7M 
rows inside

//val llg = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75, 
-240990|161324,9051480,0,2,30.48,75))

val ids = llg.flatMap(line = line.split(,).slice(0,1)) //Try to get the 
first column as key

val counts = ids.distinct.count

 

I think I should have enough memory, so there should not have any spilling 
happen. Anyone can give me some idea why or where I can tuning the system to 
reduce the spilling (it is not an issue on this dataset, but I want to see how 
to tuning it up).

The Spark UI shows only 24.2MB on the shuffle write. And if I have 10G memory 
for executor, why it need to spill.

 

2015-01-13 20:01:53,010 INFO  [sparkDriver-akka.actor.default-dispatcher-2] 
storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block 
broadcast_2_piece0

2015-01-13 20:01:53,011 INFO  [Spark Context Cleaner] spark.ContextCleaner 
(Logging.scala:logInfo(59)) - Cleaned broadcast 2

2015-01-13 20:01:53,399 INFO  [Executor task launch worker-5] 
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 149 
spilling in-memory map of 23.4 MB to disk (3 times so far)

2015-01-13 20:01:53,516 INFO  [Executor task launch worker-7] 
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 151 
spilling in-memory map of 23.4 MB to disk (3 times so far)

2015-01-13 20:01:53,531 INFO  [Executor task launch worker-6] 
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 150 
spilling in-memory map of 23.2 MB to disk (3 times so far)

2015-01-13 20:01:53,793 INFO  [Executor task launch worker-4] 
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 148 
spilling in-memory map of 23.4 MB to disk (3 times so far)

2015-01-13 20:01:54,460 INFO  [Executor task launch worker-5] 
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 149 
spilling in-memory map of 23.2 MB to disk (4 times so far)

2015-01-13 20:01:54,469 INFO  [Executor task launch worker-7] 
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 151 
spilling in-memory map of 23.2 MB to disk (4 times so far)

2015-01-13 20:01:55,144 INFO  [Executor task launch worker-6] 
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 150 
spilling in-memory map of 24.2 MB to disk (4 times so far)

2015-01-13 20:01:55,192 INFO  [Executor task launch worker-4] 
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 148 
spilling in-memory map of 23.2 MB to disk (4 times so far)

 

I am trying to collect more benchmark for next step bigger dataset and more 
complex logic.

 

Regards,

 

Shuai

 



Why always spilling to disk and how to improve it?

2015-01-13 Thread Shuai Zheng
Hi All,

 

I am trying with some small data set. It is only 200m, and what I am doing
is just do a distinct count on it.

But there are a lot of spilling happen in the log (I attached in the end of
the email).

 

Basically I use 10G memory, run on a one-node EMR cluster with r3*8xlarge
instance type (which has 244G memory and 32 vCPU).

 

My code is simple, run in the spark-shell (~/spark/bin/spark-shell
--executor-cores 4 --executor-memory 10G)

 

val llg = sc.textFile(s3://./part-r-0) // File is around 210.5M, 4.7M
rows inside

//val llg = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75,
-240990|161324,9051480,0,2,30.48,75))

val ids = llg.flatMap(line = line.split(,).slice(0,1)) //Try to get the
first column as key

val counts = ids.distinct.count

 

I think I should have enough memory, so there should not have any spilling
happen. Anyone can give me some idea why or where I can tuning the system to
reduce the spilling (it is not an issue on this dataset, but I want to see
how to tuning it up).

The Spark UI shows only 24.2MB on the shuffle write. And if I have 10G
memory for executor, why it need to spill.

 

2015-01-13 20:01:53,010 INFO  [sparkDriver-akka.actor.default-dispatcher-2]
storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of
block broadcast_2_piece0

2015-01-13 20:01:53,011 INFO  [Spark Context Cleaner] spark.ContextCleaner
(Logging.scala:logInfo(59)) - Cleaned broadcast 2

2015-01-13 20:01:53,399 INFO  [Executor task launch worker-5]
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 149
spilling in-memory map of 23.4 MB to disk (3 times so far)

2015-01-13 20:01:53,516 INFO  [Executor task launch worker-7]
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 151
spilling in-memory map of 23.4 MB to disk (3 times so far)

2015-01-13 20:01:53,531 INFO  [Executor task launch worker-6]
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 150
spilling in-memory map of 23.2 MB to disk (3 times so far)

2015-01-13 20:01:53,793 INFO  [Executor task launch worker-4]
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 148
spilling in-memory map of 23.4 MB to disk (3 times so far)

2015-01-13 20:01:54,460 INFO  [Executor task launch worker-5]
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 149
spilling in-memory map of 23.2 MB to disk (4 times so far)

2015-01-13 20:01:54,469 INFO  [Executor task launch worker-7]
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 151
spilling in-memory map of 23.2 MB to disk (4 times so far)

2015-01-13 20:01:55,144 INFO  [Executor task launch worker-6]
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 150
spilling in-memory map of 24.2 MB to disk (4 times so far)

2015-01-13 20:01:55,192 INFO  [Executor task launch worker-4]
collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 148
spilling in-memory map of 23.2 MB to disk (4 times so far)

 

I am trying to collect more benchmark for next step bigger dataset and more
complex logic.

 

Regards,

 

Shuai



Re: S3 files , Spark job hungsup

2014-12-22 Thread Shuai Zheng
Is it possible too many connections open to read from s3 from one node? I
have this issue before because I open a few hundreds of files on s3 to read
from one node. It just block itself without error until timeout later.

On Monday, December 22, 2014, durga durgak...@gmail.com wrote:

 Hi All,

 I am facing a strange issue sporadically. occasionally my spark job is
 hungup on reading s3 files. It is not throwing exception . or making some
 progress, it is just hungs up there.

 Is this a known issue , Please let me know how could I solve this issue.

 Thanks,
 -D



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/S3-files-Spark-job-hungsup-tp20806.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:;
 For additional commands, e-mail: user-h...@spark.apache.org javascript:;




Network file input cannot be recognized?

2014-12-21 Thread Shuai Zheng
Hi,

I am running a code which takes a network file (not HDFS) location as
input. But sc.textFile(networklocation\\README.md) can't recognize
the network location start with  as a valid location, because it only
accept HDFS and local like file name format?

Anyone has idea how can I use a network file location as input for create
RDD?

Regards,

Shuai


Find the file info of when load the data into RDD

2014-12-21 Thread Shuai Zheng
Hi All,

When I try to load a folder into the RDDs, any way for me to find the input
file name of particular partitions? So I can track partitions from which
file.

In the hadoop, I can find this information through the code:

FileSplit fileSplit = (FileSplit) context.getInputSplit();
String strFilename = fileSplit.getPath().getName();

But how can I do this in spark?

Regards,

Shuai


Re: Find the file info of when load the data into RDD

2014-12-21 Thread Shuai Zheng
I just found a possible answer:

http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/

Will give a try on it. Although it is a bit troublesome, but if it works,
will give what I want.

Sorry for bother everyone here

Regards,

Shuai

On Sun, Dec 21, 2014 at 4:43 PM, Shuai Zheng szheng.c...@gmail.com wrote:

 Hi All,

 When I try to load a folder into the RDDs, any way for me to find the
 input file name of particular partitions? So I can track partitions from
 which file.

 In the hadoop, I can find this information through the code:

 FileSplit fileSplit = (FileSplit) context.getInputSplit();
 String strFilename = fileSplit.getPath().getName();

 But how can I do this in spark?

 Regards,

 Shuai



Any potentiail issue if I create a SparkContext in executor

2014-12-19 Thread Shuai Zheng
Hi All,

 

I notice if we create a spark context in driver, we need to call stop method
to clear it.

 

  SparkConf sparkConf = new
SparkConf().setAppName(FinancialEngineExecutor);

  JavaSparkContext ctx = new JavaSparkContext(sparkConf);

.

  String inputPath =
props[0].getProperty(Constants.S3_INPUT_FILES);

  JavaRDDString lines = ctx.textFile(inputPath);

  EngineFlatMapFunction engine = new EngineFlatMapFunction();

  engine.setAnalysisConfiguraitons(props);

  

  lines.mapPartitionsToPair(engine);

.

  ctx.stop();

And if I I have below code in the closure (EngineFlatMapFunction.java)

 

   Configuration hadoopConfiguration = new Configuration(new
JavaSparkContext(new SparkConf()).hadoopConfiguration());

 

Any issue there? Because I need to have the Hadoop configuration in closure
but the Configuration class itself is not serializable, so I retrieve it
from the executor part.

 

Will it have any issue if I create the Spark context in the above code
without call stop on it?

 

Regards,

 

Shuai



RE: Control default partition when load a RDD from HDFS

2014-12-18 Thread Shuai Zheng
Hmmm, how to do that? You mean for each file create a RDD? Then I will have
tons of RDD.

And my calculation need to rely on other input, not just the file itself

 

Can you show some pseudo code for that logic?

 

Regards,

 

Shuai

 

From: Diego García Valverde [mailto:dgarci...@agbar.es] 
Sent: Wednesday, December 17, 2014 11:04 AM
To: Shuai Zheng; 'Sun, Rui'; user@spark.apache.org
Subject: RE: Control default partition when load a RDD from HDFS

 

Why not is a good option to create a RDD per each 200Mb file and then apply
the pre-calculations before merging them? I think the partitions per RDD
must be transparent to the pre-calculations, and not to set them fixed to
optimize the spark maps/reduces processes.

 

De: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Enviado el: miércoles, 17 de diciembre de 2014 16:01
Para: 'Sun, Rui'; user@spark.apache.org
Asunto: RE: Control default partition when load a RDD from HDFS

 

Nice, that is the answer I want. 

Thanks!

 

From: Sun, Rui [mailto:rui@intel.com] 
Sent: Wednesday, December 17, 2014 1:30 AM
To: Shuai Zheng; user@spark.apache.org
Subject: RE: Control default partition when load a RDD from HDFS

 

Hi, Shuai,

 

How did you turn off the file split in Hadoop? I guess you might have
implemented a customized FileInputFormat which overrides isSplitable() to
return FALSE. If you do have such FileInputFormat, you can simply pass it as
a constructor parameter to HadoopRDD or NewHadoopRDD in Spark.

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Wednesday, December 17, 2014 4:16 AM
To: user@spark.apache.org
Subject: Control default partition when load a RDD from HDFS

 

Hi All,

 

My application load 1000 files, each file from 200M –  a few GB, and combine
with other data to do calculation. 

Some pre-calculation must be done on each file level, then after that, the
result need to combine to do further calculation. 

In Hadoop, it is simple because I can turn-off the file split for input
format (to enforce each file will go to same mapper), then I will do the
file level calculation in mapper and pass result to reducer. But in spark,
how can I do it?

Basically I want to make sure after I load these files into RDD, it is
partitioned by file (not split file and also no merge there), so I can call
mapPartitions. Is it any way I can control the default partition when I load
the RDD? 

This might be the default behavior that spark do the partition (partitioned
by file when first time load the RDD), but I can’t find any document to
support my guess, if not, can I enforce this kind of partition? Because the
total file size is bigger, I don’t want to re-partition in the code. 

 

Regards,

 

Shuai

 

  _  

Disclaimer: http://disclaimer.agbar.com



RE: Control default partition when load a RDD from HDFS

2014-12-17 Thread Shuai Zheng
Nice, that is the answer I want. 

Thanks!

 

From: Sun, Rui [mailto:rui@intel.com] 
Sent: Wednesday, December 17, 2014 1:30 AM
To: Shuai Zheng; user@spark.apache.org
Subject: RE: Control default partition when load a RDD from HDFS

 

Hi, Shuai,

 

How did you turn off the file split in Hadoop? I guess you might have
implemented a customized FileInputFormat which overrides isSplitable() to
return FALSE. If you do have such FileInputFormat, you can simply pass it as
a constructor parameter to HadoopRDD or NewHadoopRDD in Spark.

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Wednesday, December 17, 2014 4:16 AM
To: user@spark.apache.org
Subject: Control default partition when load a RDD from HDFS

 

Hi All,

 

My application load 1000 files, each file from 200M -  a few GB, and combine
with other data to do calculation. 

Some pre-calculation must be done on each file level, then after that, the
result need to combine to do further calculation. 

In Hadoop, it is simple because I can turn-off the file split for input
format (to enforce each file will go to same mapper), then I will do the
file level calculation in mapper and pass result to reducer. But in spark,
how can I do it?

Basically I want to make sure after I load these files into RDD, it is
partitioned by file (not split file and also no merge there), so I can call
mapPartitions. Is it any way I can control the default partition when I load
the RDD? 

This might be the default behavior that spark do the partition (partitioned
by file when first time load the RDD), but I can't find any document to
support my guess, if not, can I enforce this kind of partition? Because the
total file size is bigger, I don't want to re-partition in the code. 

 

Regards,

 

Shuai



Control default partition when load a RDD from HDFS

2014-12-16 Thread Shuai Zheng
Hi All,

 

My application load 1000 files, each file from 200M -  a few GB, and combine
with other data to do calculation. 

Some pre-calculation must be done on each file level, then after that, the
result need to combine to do further calculation. 

In Hadoop, it is simple because I can turn-off the file split for input
format (to enforce each file will go to same mapper), then I will do the
file level calculation in mapper and pass result to reducer. But in spark,
how can I do it?

Basically I want to make sure after I load these files into RDD, it is
partitioned by file (not split file and also no merge there), so I can call
mapPartitions. Is it any way I can control the default partition when I load
the RDD? 

This might be the default behavior that spark do the partition (partitioned
by file when first time load the RDD), but I can't find any document to
support my guess, if not, can I enforce this kind of partition? Because the
total file size is bigger, I don't want to re-partition in the code. 

 

Regards,

 

Shuai



RE: Any Replicated RDD in Spark?

2014-11-06 Thread Shuai Zheng
Matei,

Thanks for reply.

I don't worry that much about more code because I migrate from mapreduce, so
I have existing code to handle it. But if I want to use a new tech, I will
always prefer right way not a temporary easy way!. I will go with RDD
first to test the performance.

Thanks!

Shuai

-Original Message-
From: Matei Zaharia [mailto:matei.zaha...@gmail.com] 
Sent: Wednesday, November 05, 2014 6:27 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: Re: Any Replicated RDD in Spark?

If you start with an RDD, you do have to collect to the driver and broadcast
to do this. Between the two options you listed, I think this one is simpler
to implement, and there won't be a huge difference in performance, so you
can go for it. Opening InputStreams to a distributed file system by hand can
be a lot of code.

Matei

 On Nov 5, 2014, at 12:37 PM, Shuai Zheng szheng.c...@gmail.com wrote:
 
 And another similar case:
 
 If I have get a RDD from previous step, but for next step it should be 
 a map side join (so I need to broadcast this RDD to every nodes). What 
 is the best way for me to do that? Collect RDD in driver first and 
 create broadcast? Or any shortcut in spark for this?
 
 Thanks!
 
 -Original Message-
 From: Shuai Zheng [mailto:szheng.c...@gmail.com]
 Sent: Wednesday, November 05, 2014 3:32 PM
 To: 'Matei Zaharia'
 Cc: 'user@spark.apache.org'
 Subject: RE: Any Replicated RDD in Spark?
 
 Nice.
 
 Then I have another question, if I have a file (or a set of files: 
 part-0, part-1, might be a few hundreds MB csv to 1-2 GB, created by 
 other program), need to create hashtable from it, later broadcast it 
 to each node to allow query (map side join). I have two options to do it:
 
 1, I can just load the file in a general code (open a inputstream, 
 etc), parse content and then create the broadcast from it.
 2, I also can use a standard way to create the RDD from these file, 
 run the map to parse it, then collect it as map, wrap the result as 
 broadcast to push to all nodes again.
 
 I think the option 2 might be more consistent with spark's concept 
 (and less code?)? But how about the performance? The gain is can 
 parallel load and parse the data, penalty is after load we need to 
 collect and broadcast result again? Please share your opinion. I am 
 not sure what is the best practice here (in theory, either way works, 
 but in real world, which one is better?).
 
 Regards,
 
 Shuai
 
 -Original Message-
 From: Matei Zaharia [mailto:matei.zaha...@gmail.com]
 Sent: Monday, November 03, 2014 4:15 PM
 To: Shuai Zheng
 Cc: user@spark.apache.org
 Subject: Re: Any Replicated RDD in Spark?
 
 You need to use broadcast followed by flatMap or mapPartitions to do 
 map-side joins (in your map function, you can look at the hash table 
 you broadcast and see what records match it). Spark SQL also does it 
 by default for tables smaller than the 
 spark.sql.autoBroadcastJoinThreshold setting (by default 10 KB, which 
 is really small, but you can bump this up with set
 spark.sql.autoBroadcastJoinThreshold=100 for example).
 
 Matei
 
 On Nov 3, 2014, at 1:03 PM, Shuai Zheng szheng.c...@gmail.com wrote:
 
 Hi All,
 
 I have spent last two years on hadoop but new to spark.
 I am planning to move one of my existing system to spark to get some
 enhanced features.
 
 My question is:
 
 If I try to do a map side join (something similar to Replicated key 
 word
 in Pig), how can I do it? Is it anyway to declare a RDD as replicated
 (means distribute it to all nodes and each node will have a full copy)?
 
 I know I can use accumulator to get this feature, but I am not sure 
 what
 is the best practice. And if I accumulator to broadcast the data set, 
 can then (after broadcast) convert it into a RDD and do the join?
 
 Regards,
 
 Shuai
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
 additional commands, e-mail: user-h...@spark.apache.org
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Any Replicated RDD in Spark?

2014-11-05 Thread Shuai Zheng
Nice.

Then I have another question, if I have a file (or a set of files: part-0,
part-1, might be a few hundreds MB csv to 1-2 GB, created by other program),
need to create hashtable from it, later broadcast it to each node to allow
query (map side join). I have two options to do it:

1, I can just load the file in a general code (open a inputstream, etc),
parse content and then create the broadcast from it. 
2, I also can use a standard way to create the RDD from these file, run the
map to parse it, then collect it as map, wrap the result as broadcast to
push to all nodes again.

I think the option 2 might be more consistent with spark's concept (and less
code?)? But how about the performance? The gain is can parallel load and
parse the data, penalty is after load we need to collect and broadcast
result again? Please share your opinion. I am not sure what is the best
practice here (in theory, either way works, but in real world, which one is
better?). 

Regards,

Shuai

-Original Message-
From: Matei Zaharia [mailto:matei.zaha...@gmail.com] 
Sent: Monday, November 03, 2014 4:15 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: Re: Any Replicated RDD in Spark?

You need to use broadcast followed by flatMap or mapPartitions to do
map-side joins (in your map function, you can look at the hash table you
broadcast and see what records match it). Spark SQL also does it by default
for tables smaller than the spark.sql.autoBroadcastJoinThreshold setting (by
default 10 KB, which is really small, but you can bump this up with set
spark.sql.autoBroadcastJoinThreshold=100 for example).

Matei

 On Nov 3, 2014, at 1:03 PM, Shuai Zheng szheng.c...@gmail.com wrote:
 
 Hi All,
 
 I have spent last two years on hadoop but new to spark.
 I am planning to move one of my existing system to spark to get some
enhanced features.
 
 My question is:
 
 If I try to do a map side join (something similar to Replicated key word
in Pig), how can I do it? Is it anyway to declare a RDD as replicated
(means distribute it to all nodes and each node will have a full copy)?
 
 I know I can use accumulator to get this feature, but I am not sure what
is the best practice. And if I accumulator to broadcast the data set, can
then (after broadcast) convert it into a RDD and do the join?
 
 Regards,
 
 Shuai



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Any Replicated RDD in Spark?

2014-11-05 Thread Shuai Zheng
And another similar case:

If I have get a RDD from previous step, but for next step it should be a map
side join (so I need to broadcast this RDD to every nodes). What is the best
way for me to do that? Collect RDD in driver first and create broadcast? Or
any shortcut in spark for this?

Thanks!

-Original Message-
From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Wednesday, November 05, 2014 3:32 PM
To: 'Matei Zaharia'
Cc: 'user@spark.apache.org'
Subject: RE: Any Replicated RDD in Spark?

Nice.

Then I have another question, if I have a file (or a set of files: part-0,
part-1, might be a few hundreds MB csv to 1-2 GB, created by other program),
need to create hashtable from it, later broadcast it to each node to allow
query (map side join). I have two options to do it:

1, I can just load the file in a general code (open a inputstream, etc),
parse content and then create the broadcast from it. 
2, I also can use a standard way to create the RDD from these file, run the
map to parse it, then collect it as map, wrap the result as broadcast to
push to all nodes again.

I think the option 2 might be more consistent with spark's concept (and less
code?)? But how about the performance? The gain is can parallel load and
parse the data, penalty is after load we need to collect and broadcast
result again? Please share your opinion. I am not sure what is the best
practice here (in theory, either way works, but in real world, which one is
better?). 

Regards,

Shuai

-Original Message-
From: Matei Zaharia [mailto:matei.zaha...@gmail.com] 
Sent: Monday, November 03, 2014 4:15 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: Re: Any Replicated RDD in Spark?

You need to use broadcast followed by flatMap or mapPartitions to do
map-side joins (in your map function, you can look at the hash table you
broadcast and see what records match it). Spark SQL also does it by default
for tables smaller than the spark.sql.autoBroadcastJoinThreshold setting (by
default 10 KB, which is really small, but you can bump this up with set
spark.sql.autoBroadcastJoinThreshold=100 for example).

Matei

 On Nov 3, 2014, at 1:03 PM, Shuai Zheng szheng.c...@gmail.com wrote:
 
 Hi All,
 
 I have spent last two years on hadoop but new to spark.
 I am planning to move one of my existing system to spark to get some
enhanced features.
 
 My question is:
 
 If I try to do a map side join (something similar to Replicated key word
in Pig), how can I do it? Is it anyway to declare a RDD as replicated
(means distribute it to all nodes and each node will have a full copy)?
 
 I know I can use accumulator to get this feature, but I am not sure what
is the best practice. And if I accumulator to broadcast the data set, can
then (after broadcast) convert it into a RDD and do the join?
 
 Regards,
 
 Shuai



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Any Replicated RDD in Spark?

2014-11-03 Thread Shuai Zheng
Hi All,

I have spent last two years on hadoop but new to spark.
I am planning to move one of my existing system to spark to get some
enhanced features.

My question is:

If I try to do a map side join (something similar to Replicated key word
in Pig), how can I do it? Is it anyway to declare a RDD as replicated
(means distribute it to all nodes and each node will have a full copy)?

I know I can use accumulator to get this feature, but I am not sure what is
the best practice. And if I accumulator to broadcast the data set, can then
(after broadcast) convert it into a RDD and do the join?

Regards,

Shuai


Memory limitation on EMR Node?

2014-11-03 Thread Shuai Zheng
Hi,

I am planning to run spark on EMR. And because my application might take a
lot of memory. On EMR, I know there is a hard limit 16G physical memory on
individual mapper/reducer (otherwise I will have an exception and this is
confirmed by AWS EMR team, at least it is the spec at this moment).

And if I use Yarn on EMR, and submit the spark job to YARN, I assume the
yarn will take the responsibility to do the resource allocation, so the
limitation on the physical memory still be 16G? Is it a reasonable guess or
anyone has any experience to use more than 16G memory on the EMR for
individual executor?

And I notice that there are some examples that allocate more than 16G
memory in the doc, so if I use spark cluster by itself, I can use more
memory?

Regards,

Shuai