RE: Broadcasting a parquet file using spark and python
Hi Michael, Thanks for feedback. I am using version 1.5.2 now. Can you tell me how to enforce the broadcast join? I don’t want to let the engine to decide the execution path of join. I want to use hint or parameter to enforce broadcast join (because I also have some cases are inner join but I want to use broadcast join). Or is there any ticket or roadmap for this feature? Regards, Shuai From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Saturday, December 05, 2015 4:11 PM To: Shuai Zheng Cc: Jitesh chandra Mishra; user Subject: Re: Broadcasting a parquet file using spark and python I believe we started supporting broadcast outer joins in Spark 1.5. Which version are you using? On Fri, Dec 4, 2015 at 2:49 PM, Shuai Zheng <szheng.c...@gmail.com> wrote: Hi all, Sorry to re-open this thread. I have a similar issue, one big parquet file left outer join quite a few smaller parquet files. But the running is extremely slow and even OOM sometimes (with 300M , I have two questions here: 1, If I use outer join, will Spark SQL auto use broadcast hashjoin? 2, If not, in the latest documents: http://spark.apache.org/docs/latest/sql-programming-guide.html spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run. How can I do this (run command analyze table) in Java? I know I can code it by myself (create a broadcast val and implement lookup by myself), but it will make code super ugly. I hope we can have either API or hint to enforce the hashjoin (instead of this suspicious autoBroadcastJoinThreshold parameter). Do we have any ticket or roadmap for this feature? Regards, Shuai From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Wednesday, April 01, 2015 2:01 PM To: Jitesh chandra Mishra Cc: user Subject: Re: Broadcasting a parquet file using spark and python You will need to create a hive parquet table that points to the data and run "ANALYZE TABLE tableName noscan" so that we have statistics on the size. On Tue, Mar 31, 2015 at 9:36 PM, Jitesh chandra Mishra <jitesh...@gmail.com> wrote: Hi Michael, Thanks for your response. I am running 1.2.1. Is there any workaround to achieve the same with 1.2.1? Thanks, Jitesh On Wed, Apr 1, 2015 at 12:25 AM, Michael Armbrust <mich...@databricks.com> wrote: In Spark 1.3 I would expect this to happen automatically when the parquet table is small (< 10mb, configurable with spark.sql.autoBroadcastJoinThreshold). If you are running 1.3 and not seeing this, can you show the code you are using to create the table? On Tue, Mar 31, 2015 at 3:25 AM, jitesh129 <jitesh...@gmail.com> wrote: How can we implement a BroadcastHashJoin for spark with python? My SparkSQL inner joins are taking a lot of time since it is performing ShuffledHashJoin. Tables on which join is performed are stored as parquet files. Please help. Thanks and regards, Jitesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Broadcasting a parquet file using spark and python
Hi all, Sorry to re-open this thread. I have a similar issue, one big parquet file left outer join quite a few smaller parquet files. But the running is extremely slow and even OOM sometimes (with 300M , I have two questions here: 1, If I use outer join, will Spark SQL auto use broadcast hashjoin? 2, If not, in the latest documents: http://spark.apache.org/docs/latest/sql-programming-guide.html spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run. How can I do this (run command analyze table) in Java? I know I can code it by myself (create a broadcast val and implement lookup by myself), but it will make code super ugly. I hope we can have either API or hint to enforce the hashjoin (instead of this suspicious autoBroadcastJoinThreshold parameter). Do we have any ticket or roadmap for this feature? Regards, Shuai From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Wednesday, April 01, 2015 2:01 PM To: Jitesh chandra Mishra Cc: user Subject: Re: Broadcasting a parquet file using spark and python You will need to create a hive parquet table that points to the data and run "ANALYZE TABLE tableName noscan" so that we have statistics on the size. On Tue, Mar 31, 2015 at 9:36 PM, Jitesh chandra Mishrawrote: Hi Michael, Thanks for your response. I am running 1.2.1. Is there any workaround to achieve the same with 1.2.1? Thanks, Jitesh On Wed, Apr 1, 2015 at 12:25 AM, Michael Armbrust wrote: In Spark 1.3 I would expect this to happen automatically when the parquet table is small (< 10mb, configurable with spark.sql.autoBroadcastJoinThreshold). If you are running 1.3 and not seeing this, can you show the code you are using to create the table? On Tue, Mar 31, 2015 at 3:25 AM, jitesh129 wrote: How can we implement a BroadcastHashJoin for spark with python? My SparkSQL inner joins are taking a lot of time since it is performing ShuffledHashJoin. Tables on which join is performed are stored as parquet files. Please help. Thanks and regards, Jitesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Tasks on second node never return in Yarn when I have more than 1 task node
Hi All, Hi Just an update on this case. I try many different combination on settings (and I just upgrade to latest EMR 4.2.0 with Spark 1.5.2). I just found out that the problem is from: spark-submit --deploy-mode client --executor-cores=24 --driver-memory=5G --executor-memory=45G If I set the –executor-cores=20 (or anything less than 20, there is no issue). This is a quite interesting case, because the instance (C3*8xlarge) has 32 virtual core and can run without any issue with one task . So I guess the issue should come from: 1, connection limit from EC2 instance on EMR to S3 (this reason doesn’t make enough sense to me, I will contact EMR support to clarify) 2, some library packed in the jar cause this limit? (also not very reasonable). Report here in case anyone face similar issue. Regards, Shuai From: Jonathan Kelly [mailto:jonathaka...@gmail.com] Sent: Thursday, November 19, 2015 6:54 PM To: Shuai Zheng Cc: user Subject: Re: Spark Tasks on second node never return in Yarn when I have more than 1 task node I don't know if this actually has anything to do with why your job is hanging, but since you are using EMR you should probably not set those fs.s3 properties but rather let it use EMRFS, EMR's optimized Hadoop FileSystem implementation for interacting with S3. One benefit is that it will automatically pick up your AWS credentials from your EC2 instance role rather than you having to configure them manually (since doing so is insecure because you have to get the secret access key onto your instance). If simply making that change does not fix the issue, a jstack of the hung process would help you figure out what it is doing. You should also look at the YARN container logs (which automatically get uploaded to your S3 logs bucket if you have this enabled). ~ Jonathan On Thu, Nov 19, 2015 at 1:32 PM, Shuai Zheng <szheng.c...@gmail.com> wrote: Hi All, I face a very weird case. I have already simplify the scenario to the most so everyone can replay the scenario. My env: AWS EMR 4.1.0, Spark1.5 My code can run without any problem when I run it in a local mode, and it has no problem when it run on a EMR cluster with one master and one task node. But when I try to run a multiple node (more than 1 task node, which means 3 nodes cluster), the tasks will never return from one of it. The log as below: 15/11/19 21:19:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes) 15/11/19 21:19:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes) 15/11/19 21:19:07 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes) 15/11/19 21:19:07 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes) So you can see the task will alternatively submitted to two instances, one is ip-10-165-121-188 and another is ip-10-155-160-147. And later only the tasks runs on the ip-10-165-121-188.ec2 will finish will always just wait there, ip-10-155-160-147.ec2 never return. The data and code has been tested in local mode, single spark cluster mode, so it should not be an issue on logic or data. And I have attached my test case here (I believe it is simple enough and no any business logic is involved): public void createSiteGridExposure2() { JavaSparkContext ctx = this.createSparkContextTest("Test"); ctx.textFile(siteEncodeLocation).flatMapToPair(new PairFlatMapFunction<String, String, String>() { @Override public Iterable<Tuple2<String, String>> call(String line) throws Exception { List<Tuple2<String, String>> res = new ArrayList<Tuple2<String, String>>(); return res; } }).collectAsMap(); ctx.stop(); } protected JavaSparkContext createSparkContextTest(String appName) { SparkConf sparkConf = new SparkConf().setAppName(appName); JavaSparkContext ctx = new JavaSparkContext(sparkConf); Configuration hadoopConf = ctx.hadoopConfiguration(); if (awsAccessKeyId != null) { hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); hadoopConf.set("fs.s3.awsAccessKeyId", awsAccessKeyId); hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey); hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); ha
Spark Tasks on second node never return in Yarn when I have more than 1 task node
Hi All, I face a very weird case. I have already simplify the scenario to the most so everyone can replay the scenario. My env: AWS EMR 4.1.0, Spark1.5 My code can run without any problem when I run it in a local mode, and it has no problem when it run on a EMR cluster with one master and one task node. But when I try to run a multiple node (more than 1 task node, which means 3 nodes cluster), the tasks will never return from one of it. The log as below: 15/11/19 21:19:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes) 15/11/19 21:19:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes) 15/11/19 21:19:07 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes) 15/11/19 21:19:07 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes) So you can see the task will alternatively submitted to two instances, one is ip-10-165-121-188 and another is ip-10-155-160-147. And later only the tasks runs on the ip-10-165-121-188.ec2 will finish will always just wait there, ip-10-155-160-147.ec2 never return. The data and code has been tested in local mode, single spark cluster mode, so it should not be an issue on logic or data. And I have attached my test case here (I believe it is simple enough and no any business logic is involved): public void createSiteGridExposure2() { JavaSparkContext ctx = this.createSparkContextTest("Test"); ctx.textFile(siteEncodeLocation).flatMapToPair(new PairFlatMapFunction() { @Override public Iterable > call(String line) throws Exception { List > res = new ArrayList >(); return res; } }).collectAsMap(); ctx.stop(); } protected JavaSparkContext createSparkContextTest(String appName) { SparkConf sparkConf = new SparkConf().setAppName(appName); JavaSparkContext ctx = new JavaSparkContext(sparkConf); Configuration hadoopConf = ctx.hadoopConfiguration(); if (awsAccessKeyId != null) { hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); hadoopConf.set("fs.s3.awsAccessKeyId", awsAccessKeyId); hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey); hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); hadoopConf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId); hadoopConf.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey); } return ctx; } Anyone has any idea why this happened? I am a bit lost because the code works in local mode and 2 node (1 master 1 task) clusters, but when it move a multiple task nodes cluster, I have this issue. No error no exception, not even timeout (because I wait more than 1 hours and there is no timeout also). Regards, Shuai
RE: [Spark 1.5]: Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError: Java heap space -- Work in 1.4, but 1.5 doesn't
And an update is: this ONLY happen in Spark 1.5, I try to run it under Spark 1.4 and 1.4.1, there are no issue (the program is developed under Spark 1.4 last time, and I just re-test it, it works). So this is proven that there is no issue on the logic and data, it is caused by the new version of Spark. So I want to know any new setup I should set in Spark 1.5 to make it work? Regards, Shuai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, November 04, 2015 3:22 PM To: user@spark.apache.org Subject: [Spark 1.5]: Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError: Java heap space Hi All, I have a program which actually run a bit complex business (join) in spark. And I have below exception: I running on Spark 1.5, and with parameter: spark-submit --deploy-mode client --executor-cores=24 --driver-memory=2G --executor-memory=45G -class . Some other setup: sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buff er.max", "2047m"); sparkConf.set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps").set("spark.sql.autoBroadcastJoinThreshold", "104857600"); This is running on AWS c3*8xlarge instance. I am not sure what kind of parameter I should set if I have below OutOfMemoryError exception. # # java.lang.OutOfMemoryError: Java heap space # -XX:OnOutOfMemoryError="kill -9 %p" # Executing /bin/sh -c "kill -9 10181"... Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError: Java heap space at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProje ction.apply(Unknown Source) at org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelat ion.scala:380) at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.sc ala:123) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca stFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:95) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca stFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:85) at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.sc ala:100) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca stFuture$1.apply(BroadcastHashOuterJoin.scala:85) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca stFuture$1.apply(BroadcastHashOuterJoin.scala:85) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future. scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11 45) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6 15) at java.lang.Thread.run(Thread.java:745) Any hint will be very helpful. Regards, Shuai
[Spark 1.5]: Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError: Java heap space
Hi All, I have a program which actually run a bit complex business (join) in spark. And I have below exception: I running on Spark 1.5, and with parameter: spark-submit --deploy-mode client --executor-cores=24 --driver-memory=2G --executor-memory=45G -class . Some other setup: sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buff er.max", "2047m"); sparkConf.set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps").set("spark.sql.autoBroadcastJoinThreshold", "104857600"); This is running on AWS c3*8xlarge instance. I am not sure what kind of parameter I should set if I have below OutOfMemoryError exception. # # java.lang.OutOfMemoryError: Java heap space # -XX:OnOutOfMemoryError="kill -9 %p" # Executing /bin/sh -c "kill -9 10181"... Exception in thread "broadcast-hash-join-2" java.lang.OutOfMemoryError: Java heap space at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProje ction.apply(Unknown Source) at org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelat ion.scala:380) at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.sc ala:123) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca stFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:95) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca stFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:85) at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.sc ala:100) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca stFuture$1.apply(BroadcastHashOuterJoin.scala:85) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadca stFuture$1.apply(BroadcastHashOuterJoin.scala:85) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future. scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11 45) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6 15) at java.lang.Thread.run(Thread.java:745) Any hint will be very helpful. Regards, Shuai
How to Take the whole file as a partition
Hi All, I have 1000 files, from 500M to 1-2GB at this moment. And I want my spark can read them as partition on the file level. Which means want the FileSplit turn off. I know there are some solutions, but not very good in my case: 1, I can't use WholeTextFiles method, because my file is too big, I don't want to risk the performance. 2, I try to use newAPIHadoopFile and turnoff the file split: lines = ctx.newAPIHadoopFile(inputPath, NonSplitableTextInputFormat.class, LongWritable.class, Text.class, hadoopConf).values() .map(new Function() { @Override public String call(Text arg0) throws Exception { return arg0.toString(); } }); This works for some cases, but it truncate some lines (I am not sure why, but it looks like there is a limit on this file reading). I have a feeling that the spark truncate this file on 2GB bytes. Anyway it happens (because same data has no issue when I use mapreduce to do the input), the spark sometimes do a trunc on very big file if try to read all of them. 3, I can do another way is distribute the file name as the input of the Spark and in function open stream to read the file directly. This is what I am planning to do but I think it is ugly. I want to know anyone have better solution for it? BTW: the file currently in text format, but it might be parquet format later, that is also reason I don't like my third option. Regards, Shuai
RE: How to Take the whole file as a partition
Hi, Will there any way to change the default split size when load data for Spark? By default it is 64M, I know how to change this in Hadoop Mapreduce, but not sure how to do this in Spark. Regards, Shuai From: Tao Lu [mailto:taolu2...@gmail.com] Sent: Thursday, September 03, 2015 11:07 AM To: Shuai Zheng Cc: user Subject: Re: How to Take the whole file as a partition You situation is special. It seems to me Spark may not fit well in your case. You want to process the individual files (500M~2G) as a whole, you want good performance. You may want to write our own Scala/Java programs and distribute it along with those files across your cluster, and run them in parallel. If you insist on using Spark, maybe option 3 is closer. Cheers, Tao On Thu, Sep 3, 2015 at 10:22 AM, Shuai Zheng <szheng.c...@gmail.com> wrote: Hi All, I have 1000 files, from 500M to 1-2GB at this moment. And I want my spark can read them as partition on the file level. Which means want the FileSplit turn off. I know there are some solutions, but not very good in my case: 1, I can’t use WholeTextFiles method, because my file is too big, I don’t want to risk the performance. 2, I try to use newAPIHadoopFile and turnoff the file split: lines = ctx.newAPIHadoopFile(inputPath, NonSplitableTextInputFormat.class, LongWritable.class, Text.class, hadoopConf).values() .map(new Function<Text, String>() { @Override public String call(Text arg0) throws Exception { return arg0.toString(); } }); This works for some cases, but it truncate some lines (I am not sure why, but it looks like there is a limit on this file reading). I have a feeling that the spark truncate this file on 2GB bytes. Anyway it happens (because same data has no issue when I use mapreduce to do the input), the spark sometimes do a trunc on very big file if try to read all of them. 3, I can do another way is distribute the file name as the input of the Spark and in function open stream to read the file directly. This is what I am planning to do but I think it is ugly. I want to know anyone have better solution for it? BTW: the file currently in text format, but it might be parquet format later, that is also reason I don’t like my third option. Regards, Shuai -- Thanks! Tao
org.apache.hadoop.security.AccessControlException: Permission denied when access S3
Hi All, I try to access S3 file from S3 in Hadoop file format: Below is my code: Configuration hadoopConf = ctx.hadoopConfiguration(); hadoopConf.set(fs.s3n.awsAccessKeyId, this.getAwsAccessKeyId()); hadoopConf.set(fs.s3n.awsSecretAccessKey, this.getAwsSecretAccessKey()); lines = ctx.newAPIHadoopFile(inputPath, NonSplitableTextInputFormat.class, LongWritable.class, Text.class, hadoopConf).values() .map(new FunctionText, String() { @Override public String call(Text arg0) throws Exception { return arg0.toString(); } }); And I have below error: Exception in thread main org.apache.hadoop.security.AccessControlException: Permission denied: s3n:// at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(J ets3tNativeFileSystemStore.java:449) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(J ets3tNativeFileSystemStore.java:427) The permission should not have any problem (because I can use ctx.textFile without any issue). So the issue from the call: newAPIHadoopFile Anything else I need to setup for this? Regards, Shuai
RE: [SPARK-6330] 1.4.0/1.5.0 Bug to access S3 -- AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyI
I have tried both cases(s3 and s3n, set all possible parameters), and trust me, the same code works with 1.3.1, but not for 1.3.0 and 1.4.0, 1.5.0. I even use a plain project to test this, and use maven to include all referenced library, but it give me error. I think everyone can easily to replicate my issue locally (the code doesn’t need to run on EC2, I run it directly from my local windows pc). Regards, Shuai From: Aaron Davidson [mailto:ilike...@gmail.com] Sent: Wednesday, June 10, 2015 12:28 PM To: Shuai Zheng Subject: Re: [SPARK-6330] 1.4.0/1.5.0 Bug to access S3 -- AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively) That exception is a bit weird as it refers to fs.s3 instead of fs.s3n. Maybe you are accidentally using s3://? Otherwise, you might try also specifying that property too. On Jun 9, 2015 12:45 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I have some code to access s3 from Spark. The code is as simple as: JavaSparkContext ctx = new JavaSparkContext(sparkConf); Configuration hadoopConf = ctx.hadoopConfiguration(); hadoopConf.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem); hadoopConf.set(fs.s3n.awsAccessKeyId, ---); hadoopConf.set(fs.s3n.awsSecretAccessKey, --); SQLContext sql = new SQLContext(ctx); DataFrame grid_lookup = sql.parquetFile(s3n://---); grid_lookup.count(); ctx.stop(); The code works for 1.3.1. And for 1.4.0 and latest 1.5.0, it always give me below exception: Exception in thread main java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively). I don’t know why, I remember this is a known issue in 1.3.0: https://issues.apache.org/jira/browse/SPARK-6330, and solved in 1.3.1 But now it is not working again for a newer version? I remember while I switched to 1.4.0, for a while it works (while I worked with the master branch of the latest source code), and I just refresh latest code, and I am given this error again. Anyone has idea? Regards, Shuai
[SPARK-6330] 1.4.0/1.5.0 Bug to access S3 -- AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
Hi All, I have some code to access s3 from Spark. The code is as simple as: JavaSparkContext ctx = new JavaSparkContext(sparkConf); Configuration hadoopConf = ctx.hadoopConfiguration(); // aws.secretKey=Zqhjim3GB69hMBvfjh+7NX84p8sMF39BHfXwO3Hs // aws.accessKey=AKIAI4YXBAJTJ77VKS4A hadoopConf.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem); hadoopConf.set(fs.s3n.awsAccessKeyId, ---); hadoopConf.set(fs.s3n.awsSecretAccessKey, --); SQLContext sql = new SQLContext(ctx); DataFrame grid_lookup = sql.parquetFile(s3n://---); grid_lookup.count(); ctx.stop(); The code works for 1.3.1. And for 1.4.0 and latest 1.5.0, it always give me below exception: Exception in thread main java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively). I don't know why, I remember this is a known issue in 1.3.0: https://issues.apache.org/jira/browse/SPARK-6330, and solved in 1.3.1 But now it is not working again for a newer version? I remember while I switched to 1.4.0, for a while it works (while I worked with the master branch of the latest source code), and I just refresh latest code, and I am given this error again. Anyone has idea? Regards, Shuai
[SQL][1.3.1][JAVA]Use UDF in DataFrame join
Hi All, I want to ask how to use UDF when I use join function on DataFrame. It looks like always give me the cannot solve the column name error. Anyone can give me an example on how to run this in java? My code is like: edmData.join(yb_lookup, edmData.col(year(YEARBUILT)).equalTo(yb_lookup.col(yb_class_vdm))); But it won't work in java. I understand col function should only take columname, but there should be a way to specific some simple expression in join statement? Regards, Shuai
[SQL][1.3.1][JAVA] UDF in java cause Task not serializable
Hi All, Basically I try to define a simple UDF and use it in the query, but it gives me Task not serializable public void test() { RiskGroupModelDefinition model = registeredRiskGroupMap.get(this.modelId); RiskGroupModelDefinition edm = this.createEdm(); JavaSparkContext ctx = this.createSparkContext(); SQLContext sql = new SQLContext(ctx); sql.udf().register(year, new UDF1Date, Integer() { @Override public Integer call(Date d) throws Exception { return d.getYear(); } }, new org.apache.spark.sql.types.IntegerType()); /** Retrieve all tables for EDM */ DataFrame property = sql.parquetFile(edm.toS3nPath(property)).filter(ISVALID = 1); property.registerTempTable(p); DataFrame yb_lookup = sql.parquetFile(model.toS3nPath(yb_lookup)).as(yb); yb_lookup.registerTempTable(yb); sql.sql(select * from p left join yb on year(p.YEARBUILT)=yb.yb_class_vdm).count(); ctx.stop(); } If I remove the UDF, just use p.YEARBUILT=yb.yb_class_vdm, the sql runs without any problem. But after I add the UDF to the query (just as above code), the exception as below: Exception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Aggregate false, [], [Coalesce(SUM(PartialCount#43L),0) AS count#41L] Exchange SinglePartition Aggregate true, [], [COUNT(1) AS PartialCount#43L] Project [] HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter, None Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200) Project [YEARBUILT#14] Filter ISVALID#18 PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at newParquet.scala:573 Exchange (HashPartitioning [yb_class_vdm#40L], 200) PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at newParquet.scala:573 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887) at org.apache.spark.sql.DataFrame.count(DataFrame.scala:899) at com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.test(Vulna bilityEncodeExecutor.java:137) at com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.main(Vulna bilityEncodeExecutor.java:488) Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange SinglePartition Aggregate true, [], [COUNT(1) AS PartialCount#43L] Project [] HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter, None Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200) Project [YEARBUILT#14] Filter ISVALID#18 PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at newParquet.scala:573 Exchange (HashPartitioning [yb_class_vdm#40L], 200) PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at newParquet.scala:573 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:48) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate. scala:126) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate. scala:125) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) ... 6 more Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Aggregate true, [], [COUNT(1) AS PartialCount#43L] Project [] HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter, None Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200) Project [YEARBUILT#14] Filter ISVALID#18 PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at newParquet.scala:573 Exchange (HashPartitioning [yb_class_vdm#40L], 200) PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at newParquet.scala:573 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc ala:101) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc ala:49) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) ... 10 more Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange
RE: Slower performance when bigger memory?
Thanks. So may I know what is your configuration for more/smaller executors on r3.8xlarge, how big of the memory that you eventually decide to give one executor without impact performance (for example: 64g? ). From: Sven Krasser [mailto:kras...@gmail.com] Sent: Friday, April 24, 2015 1:59 PM To: Dean Wampler Cc: Shuai Zheng; user@spark.apache.org Subject: Re: Slower performance when bigger memory? FWIW, I ran into a similar issue on r3.8xlarge nodes and opted for more/smaller executors. Another observation was that one large executor results in less overall read throughput from S3 (using Amazon's EMRFS implementation) in case that matters to your application. -Sven On Thu, Apr 23, 2015 at 10:18 AM, Dean Wampler deanwamp...@gmail.com wrote: JVM's often have significant GC overhead with heaps bigger than 64GB. You might try your experiments with configurations below this threshold. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 23, 2015 at 12:14 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I am running some benchmark on r3*8xlarge instance. I have a cluster with one master (no executor on it) and one slave (r3*8xlarge). My job has 1000 tasks in stage 0. R3*8xlarge has 244G memory and 32 cores. If I create 4 executors, each has 8 core+50G memory, each task will take around 320s-380s. And if I only use one big executor with 32 cores and 200G memory, each task will take 760s-900s. And I check the log, looks like the minor GC takes much longer when using 200G memory: 285.242: [GC [PSYoungGen: 29027310K-8646087K(31119872K)] 38810417K-19703013K(135977472K), 11.2509770 secs] [Times: user=38.95 sys=120.65, real=11.25 secs] And when it uses 50G memory, the minor GC takes only less than 1s. I try to see what is the best way to configure the Spark. For some special reason, I tempt to use a bigger memory on single executor if no significant penalty on performance. But now looks like it is? Anyone has any idea? Regards, Shuai -- www.skrasser.com http://www.skrasser.com/?utm_source=sig
Slower performance when bigger memory?
Hi All, I am running some benchmark on r3*8xlarge instance. I have a cluster with one master (no executor on it) and one slave (r3*8xlarge). My job has 1000 tasks in stage 0. R3*8xlarge has 244G memory and 32 cores. If I create 4 executors, each has 8 core+50G memory, each task will take around 320s-380s. And if I only use one big executor with 32 cores and 200G memory, each task will take 760s-900s. And I check the log, looks like the minor GC takes much longer when using 200G memory: 285.242: [GC [PSYoungGen: 29027310K-8646087K(31119872K)] 38810417K-19703013K(135977472K), 11.2509770 secs] [Times: user=38.95 sys=120.65, real=11.25 secs] And when it uses 50G memory, the minor GC takes only less than 1s. I try to see what is the best way to configure the Spark. For some special reason, I tempt to use a bigger memory on single executor if no significant penalty on performance. But now looks like it is? Anyone has any idea? Regards, Shuai
RE: Bug? Can't reference to the column by name after join two DataFrame on a same name key
Got it. Thanks! J From: Yin Huai [mailto:yh...@databricks.com] Sent: Thursday, April 23, 2015 2:35 PM To: Shuai Zheng Cc: user Subject: Re: Bug? Can't reference to the column by name after join two DataFrame on a same name key Hi Shuai, You can use as to create a table alias. For example, df1.as(df1). Then you can use $df1.col to refer it. Thanks, Yin On Thu, Apr 23, 2015 at 11:14 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I use 1.3.1 When I have two DF and join them on a same name key, after that, I can’t get the common key by name. Basically: select * from t1 inner join t2 on t1.col1 = t2.col1 And I am using purely DataFrame, spark SqlContext not HiveContext DataFrame df3 = df1.join(df2, df1.col(col).equalTo(df2.col(col))).select(col); because df1 and df2 join on the same key col, Then I can't reference the key col. I understand I should use a full qualified name for that column (like in SQL, use t1.col), but I don’t know how should I address this in spark sql. Exception in thread main org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous, could be: id#8L, id#0L.; It looks that joined key can't be referenced by name or by df1.col name pattern. The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive case, so I am not sure whether it is the same issue, but I still have the issue in latest code. It looks like the result after join won't keep the parent DF information anywhere? I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273 But not sure whether it is the same issue? Should I open a new ticket for this? Regards, Shuai
Bug? Can't reference to the column by name after join two DataFrame on a same name key
Hi All, I use 1.3.1 When I have two DF and join them on a same name key, after that, I can't get the common key by name. Basically: select * from t1 inner join t2 on t1.col1 = t2.col1 And I am using purely DataFrame, spark SqlContext not HiveContext DataFrame df3 = df1.join(df2, df1.col(col).equalTo(df2.col(col))).select(col); because df1 and df2 join on the same key col, Then I can't reference the key col. I understand I should use a full qualified name for that column (like in SQL, use t1.col), but I don't know how should I address this in spark sql. Exception in thread main org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous, could be: id#8L, id#0L.; It looks that joined key can't be referenced by name or by df1.col name pattern. The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive case, so I am not sure whether it is the same issue, but I still have the issue in latest code. It looks like the result after join won't keep the parent DF information anywhere? I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273 But not sure whether it is the same issue? Should I open a new ticket for this? Regards, Shuai
RE: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)
Below is my code to access s3n without problem (only for 1.3.1. there is a bug in 1.3.0). Configuration hadoopConf = ctx.hadoopConfiguration(); hadoopConf.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem); hadoopConf.set(fs.s3n.awsAccessKeyId, awsAccessKeyId); hadoopConf.set(fs.s3n.awsSecretAccessKey, awsSecretAccessKey); Regards, Shuai From: Sujee Maniyam [mailto:su...@sujee.net] Sent: Wednesday, April 22, 2015 12:45 PM To: Spark User List Subject: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:) Hi all I am unable to access s3n:// urls using sc.textFile().. getting 'no file system for scheme s3n://' error. a bug or some conf settings missing? See below for details: env variables : AWS_SECRET_ACCESS_KEY=set AWS_ACCESS_KEY_ID=set spark/RELAESE : Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Pyarn -DzincPort=3034 ./bin/spark-shell val f = sc.textFile(s3n://bucket/file) f.count error== java.io.IOException: No FileSystem for scheme: s3n at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512) at org.apache.spark.rdd.RDD.count(RDD.scala:1006) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC$$iwC.init(console:37) at $iwC$$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) at
RE:RE:maven compile error
I have similar issue (I failed on the spark core project but with same exception as you). Then I follow the below steps (I am working on windows): Delete the maven repository, and re-download all dependency. The issue sounds like a corrupted jar can’t be opened by maven. Other than this, I also did below steps (I don’t think it is the solution, but I just describe my steps): 1, Uninstall scala 2.11 version (I have one install there). Then I only have 2.10.5 on my pc 2, Upgrade maven to latest 3.3.1 3, Install latest git client Regards, Shuai From: myelinji [mailto:myeli...@aliyun.com] Sent: Friday, April 03, 2015 6:58 AM To: Sean Owen Cc: spark用户组 Subject: 答复:maven compile error Thank you for your reply. When I'm using maven to compile the whole project, the erros as follows [INFO] Spark Project Parent POM .. SUCCESS [4.136s] [INFO] Spark Project Networking .. SUCCESS [7.405s] [INFO] Spark Project Shuffle Streaming Service ... SUCCESS [5.071s] [INFO] Spark Project Core SUCCESS [3:08.445s] [INFO] Spark Project Bagel ... SUCCESS [21.613s] [INFO] Spark Project GraphX .. SUCCESS [58.915s] [INFO] Spark Project Streaming ... SUCCESS [1:26.202s] [INFO] Spark Project Catalyst FAILURE [1.537s] [INFO] Spark Project SQL . SKIPPED [INFO] Spark Project ML Library .. SKIPPED [INFO] Spark Project Tools ... SKIPPED [INFO] Spark Project Hive SKIPPED [INFO] Spark Project REPL SKIPPED [INFO] Spark Project Assembly SKIPPED [INFO] Spark Project External Twitter SKIPPED [INFO] Spark Project External Flume Sink . SKIPPED [INFO] Spark Project External Flume .. SKIPPED [INFO] Spark Project External MQTT ... SKIPPED [INFO] Spark Project External ZeroMQ . SKIPPED [INFO] Spark Project External Kafka .. SKIPPED [INFO] Spark Project Examples SKIPPED it seems like there is something wrong with calatlyst project. Why i cannot compile this project? -- 发件人:Sean Owen so...@cloudera.com 发送时间:2015年4月3日(星期五) 17:48 收件人:myelinji myeli...@aliyun.com 抄 送:spark用户组 user@spark.apache.org 主 题:Re: maven compile error If you're asking about a compile error, you should include the command you used to compile. I am able to compile branch 1.2 successfully with mvn -DskipTests clean package. This error is actually an error from scalac, not a compile error from the code. It sort of sounds like it has not been able to download scala dependencies. Check or maybe recreate your environment. On Fri, Apr 3, 2015 at 3:19 AM, myelinji myeli...@aliyun.com wrote: Hi,all: Just now i checked out spark-1.2 on github , wanna to build it use maven, how ever I encountered an error during compiling: [INFO] [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first) on project spark-catalyst_2.10: wrap: scala.reflect.internal.MissingRequirementError: object scala.runtime in compiler mirror not found. - [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first) on project spark-catalyst_2.10: wrap: scala.reflect.internal.MissingRequirementError: object scala.runtime in compiler mirror not found. at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:217) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59) at org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:156) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:537) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:196) at org.apache.maven.cli.MavenCli.main(MavenCli.java:141) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at
Exception in thread main java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds] when create context
Hi All, In some cases, I have below exception when I run spark in local mode (I haven't see this in a cluster). This is weird but also affect my local unit test case (it is not always happen, but usually one per 4-5 times run). From the stack, looks like error happen when create the context, but I don't know why and what kind of parameters that I can set to solve this issue. Exception in thread main java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223 ) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockConte xt.scala:53) at scala.concurrent.Await$.result(package.scala:107) at akka.remote.Remoting.start(Remoting.scala:180) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala: 184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doC reateActorSystem(AkkaUtils.scala:122) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$ sp(Utils.scala:1832) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1823) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57 ) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:223) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267) at org.apache.spark.SparkContext.init(SparkContext.scala:270) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.sc ala:61) at com.***.executor.FinancialEngineExecutor.run(F inancialEngineExecutor.java:110) Regards, Shuai
RE: --driver-memory parameter doesn't work for spark-submmit on yarn?
Sorry for reply late. I bypass this by set _JAVA_OPTIONS. And the ps aux | grep spark hadoop 14442 0.6 0.2 34334552 128560 pts/0 Sl+ 14:37 0:01 /usr/java/latest/bin/java org.apache.spark.deploy.SparkSubmitDriverBootstrapper --driver-memory=5G --executor-memory=10G --master yarn-client --class com.***.FinancialEngineExecutor /home/hadoop/lib/Engine-2.0-jar-with-dependencies.jar hadoop 14544 158 13.4 37206420 8472272 pts/0 Sl+ 14:37 4:21 /usr/java/latest/bin/java -cp /home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/emr/*:/home/hadoop/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar::/home/hadoop/spark/conf:/home/hadoop/spark/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/home/hadoop/spark/lib/datanucleus-core-3.2.10.jar:/home/hadoop/spark/lib/datanucleus-rdbms-3.2.9.jar:/home/hadoop/spark/lib/datanucleus-api-jdo-3.2.6.jar:/home/hadoop/conf:/home/hadoop/conf -XX:MaxPermSize=128m -Dspark.driver.log.level=INFO -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --driver-memory=5G --executor-memory=10G --master yarn-client --class com.*executor.FinancialEngineExecutor /home/hadoop/lib/MiddlewareEngine-2.0-jar-with-dependencies.jar Above already have set _JAVA_OPTIONS=-Xmx30g, but looks like it doesn't show in the commandline. I guess SparkSubmit will read _JAVA_OPTIONS, but I just think this should be overwritten by the commandline params. Not sure what happen here, have no time to dig it out. But if you want me to provide more information. I will be happy to do that. Regards, Shuai -Original Message- From: Bozeman, Christopher [mailto:bozem...@amazon.com] Sent: Wednesday, April 01, 2015 4:59 PM To: Shuai Zheng; 'Sean Owen' Cc: 'Akhil Das'; user@spark.apache.org Subject: RE: --driver-memory parameter doesn't work for spark-submmit on yarn? Shuai, What did ps aux | grep spark-submit reveal? When you compare using _JAVA_OPTIONS and without using it, where do you see the difference? Thanks Christopher -Original Message- From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, April 01, 2015 11:12 AM To: 'Sean Owen' Cc: 'Akhil Das'; user@spark.apache.org Subject: RE: --driver-memory parameter doesn't work for spark-submmit on yarn? Nice. But when my case shows that even I use Yarn-Client, I have same issue. I do verify it several times. And I am running 1.3.0 on EMR (use the version dispatch by installSpark script from AWS). I agree _JAVA_OPTIONS is not a right solution, but I will use it until 1.4.0 out :) Regards, Shuai -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 01, 2015 10:51 AM To: Shuai Zheng Cc: Akhil Das; user@spark.apache.org Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn? I feel like I recognize that problem, and it's almost the inverse of https://issues.apache.org/jira/browse/SPARK-3884 which I was looking at today. The spark-class script didn't seem to handle all the ways that driver memory can be set. I think this is also something fixed by the new launcher library in 1.4.0. _JAVA_OPTIONS is not a good solution since it's global. On Wed, Apr 1, 2015 at 3:21 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi Akhil, Thanks a lot! After set export _JAVA_OPTIONS=-Xmx5g, the OutOfMemory exception disappeared. But this make me confused, so the driver-memory options doesn’t work for spark-submit to YARN (I haven’t check other clusters), is it a bug? Regards, Shuai From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, April 01, 2015 2:40 AM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn? Once you submit the job do a ps aux | grep spark-submit and see how much is the heap space allocated to the process (the -Xmx params), if you are seeing a lower value you could try increasing it yourself with: export _JAVA_OPTIONS=-Xmx5g Thanks Best Regards On Wed, Apr 1, 2015 at 1:57 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, Below is the my shell script: /home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G --master yarn-client --class com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties My driver will load some resources and then broadcast to all executors. That resources is only 600MB in ser format, but I always has out of memory exception, it looks like the driver doesn’t allocate right memory to my driver. Exception in thread main java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:70) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670) at java.io.ObjectInputStream.readObject0
RE: [BUG]Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer
I have found the issue, but I think it is bug. If I change my class to: public class ModelSessionBuilder implements Serializable { /** * */ . private Properties[] propertiesList; private static final long serialVersionUID = -8139500301736028670L; } The broadcast value has no issue. But in my original form, if I broadcast it as array of my custom subclass of Properties, after broadcast, the propertiesList array will be an array of empty PropertiesUtils objects there (empty, not NULL), I am not sure why this happen (the code without any problem when run with default java serializer). So I think this is a bug, but I am not sure it is a bug of spark or a bug of Kryo. Regards, Shuai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, April 06, 2015 5:34 PM To: user@spark.apache.org Subject: Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer Hi All, I have tested my code without problem on EMR yarn (spark 1.3.0) with default serializer (java). But when I switch to org.apache.spark.serializer.KryoSerializer, the broadcast value doesn't give me right result (actually return me empty custom class on inner object). Basically I broadcast a builder object, which carry an array of propertiesUtils object. The code should not have any logical issue because it works on default java serializer. But when I turn to the org.apache.spark.serializer.KryoSerializer, it looks like the Array doesn't initialize, propertiesList will give a right size, but then all element in the array is just a normal empty PropertiesUtils. Do I miss anything when I use this KryoSerializer? I just put the two lines, do I need to implement some special code to enable KryoSerializer, but I search all places but can't find any places mention it. sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); sparkConf.registerKryoClasses(new Class[]{ModelSessionBuilder.class, Constants.class, PropertiesUtils.class, ModelSession.class}); public class ModelSessionBuilder implements Serializable { /** * */ . private PropertiesUtils[] propertiesList; private static final long serialVersionUID = -8139500301736028670L; } public class PropertiesUtils extends Properties { /** * */ private static final long serialVersionUID = -3684043338580885551L; public PropertiesUtils(Properties prop) { super(prop); } public PropertiesUtils() { // TODO Auto-generated constructor stub } } Regards, Shuai
set spark.storage.memoryFraction to 0 when no cached RDD and memory area for broadcast value?
Hi All, I am a bit confused on spark.storage.memoryFraction, this is used to set the area for RDD usage, will this RDD means only for cached and persisted RDD? So if my program has no cached RDD at all (means that I have no .cache() or .persist() call on any RDD), then I can set this spark.storage.memoryFraction to a very small number or even zero? I am writing a program which consume a lot of memory (broadcast value, runtime, etc). But I have no cached RDD, so should I just turn off this spark.storage.memoryFraction to 0 (which will help me to improve the performance)? And I have another issue on the broadcast, when I try to get a broadcast value, it throws me out of memory error, which part of memory should I allocate more (if I can't increase my overall memory size). java.lang.OutOfMemoryError: Java heap spac e at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA rraySerializer.read(DefaultArraySerializers.java:218) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA rraySerializer.read(DefaultArraySerializers.java:200) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea d(FieldSerializer.java:611) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria lizer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea d(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria lizer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(Kryo Serializer.scala:138) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Ser ializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:2 48) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:13 6) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:5 49) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:431 ) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlo ck$1.apply(TorrentBroadcast.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(Torren tBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(Torrent Broadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.s cala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast .scala:87) Regards, Shuai
RE: --driver-memory parameter doesn't work for spark-submmit on yarn?
Hi Akhil, Thanks a lot! After set export _JAVA_OPTIONS=-Xmx5g, the OutOfMemory exception disappeared. But this make me confused, so the driver-memory options doesn’t work for spark-submit to YARN (I haven’t check other clusters), is it a bug? Regards, Shuai From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, April 01, 2015 2:40 AM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn? Once you submit the job do a ps aux | grep spark-submit and see how much is the heap space allocated to the process (the -Xmx params), if you are seeing a lower value you could try increasing it yourself with: export _JAVA_OPTIONS=-Xmx5g Thanks Best Regards On Wed, Apr 1, 2015 at 1:57 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, Below is the my shell script: /home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G --master yarn-client --class com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties My driver will load some resources and then broadcast to all executors. That resources is only 600MB in ser format, but I always has out of memory exception, it looks like the driver doesn’t allocate right memory to my driver. Exception in thread main java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:70) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68) Do I do anything wrong here? And no matter how much I set for --driver-memory value (from 512M to 20G), it always give me error on the same line (that line try to load a 600MB java serialization file). So looks like the script doesn’t allocate right memory to driver in my case? Regards, Shuai
RE: --driver-memory parameter doesn't work for spark-submmit on yarn?
Nice. But when my case shows that even I use Yarn-Client, I have same issue. I do verify it several times. And I am running 1.3.0 on EMR (use the version dispatch by installSpark script from AWS). I agree _JAVA_OPTIONS is not a right solution, but I will use it until 1.4.0 out :) Regards, Shuai -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 01, 2015 10:51 AM To: Shuai Zheng Cc: Akhil Das; user@spark.apache.org Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn? I feel like I recognize that problem, and it's almost the inverse of https://issues.apache.org/jira/browse/SPARK-3884 which I was looking at today. The spark-class script didn't seem to handle all the ways that driver memory can be set. I think this is also something fixed by the new launcher library in 1.4.0. _JAVA_OPTIONS is not a good solution since it's global. On Wed, Apr 1, 2015 at 3:21 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi Akhil, Thanks a lot! After set export _JAVA_OPTIONS=-Xmx5g, the OutOfMemory exception disappeared. But this make me confused, so the driver-memory options doesn’t work for spark-submit to YARN (I haven’t check other clusters), is it a bug? Regards, Shuai From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, April 01, 2015 2:40 AM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn? Once you submit the job do a ps aux | grep spark-submit and see how much is the heap space allocated to the process (the -Xmx params), if you are seeing a lower value you could try increasing it yourself with: export _JAVA_OPTIONS=-Xmx5g Thanks Best Regards On Wed, Apr 1, 2015 at 1:57 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, Below is the my shell script: /home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G --master yarn-client --class com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties My driver will load some resources and then broadcast to all executors. That resources is only 600MB in ser format, but I always has out of memory exception, it looks like the driver doesn’t allocate right memory to my driver. Exception in thread main java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:70) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 0) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 98) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68) Do I do anything wrong here? And no matter how much I set for --driver-memory value (from 512M to 20G), it always give me error on the same line (that line try to load a 600MB java serialization file). So looks like the script doesn’t allocate right memory to driver in my case? Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
--driver-memory parameter doesn't work for spark-submmit on yarn?
Hi All, Below is the my shell script: /home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G --master yarn-client --class com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties My driver will load some resources and then broadcast to all executors. That resources is only 600MB in ser format, but I always has out of memory exception, it looks like the driver doesn't allocate right memory to my driver. Exception in thread main java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:70) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68) Do I do anything wrong here? And no matter how much I set for --driver-memory value (from 512M to 20G), it always give me error on the same line (that line try to load a 600MB java serialization file). So looks like the script doesn't allocate right memory to driver in my case? Regards, Shuai
When will 1.3.1 release?
Hi All, I am waiting the spark 1.3.1 to fix the bug to work with S3 file system. Anyone knows the release date for 1.3.1? I can't downgrade to 1.2.1 because there is jar compatible issue to work with AWS SDK. Regards, Shuai
What is the jvm size when start spark-submit through local mode
Hi, I am curious, when I start a spark program in local mode, which parameter will be used to decide the jvm memory size for executor? In theory should be: --executor-memory 20G But I remember local mode will only start spark executor in the same process of driver, then should be: --driver-memory 20G Regards, Shuai
RE: Spark will process _temporary folder on S3 is very slow and always cause failure
Thanks! Let me update the status. I have copied the DirectOutputCommitter to my local. And set: Conf.set(spark.hadoop.mapred.output.committer.class, org..DirectOutputCommitter) It works perfectly. Thanks everyone J Regards, Shuai From: Aaron Davidson [mailto:ilike...@gmail.com] Sent: Tuesday, March 17, 2015 3:06 PM To: Imran Rashid Cc: Shuai Zheng; user@spark.apache.org Subject: Re: Spark will process _temporary folder on S3 is very slow and always cause failure Actually, this is the more relevant JIRA (which is resolved): https://issues.apache.org/jira/browse/SPARK-3595 6352 is about saveAsParquetFile, which is not in use here. Here is a DirectOutputCommitter implementation: https://gist.github.com/aarondav/c513916e72101bbe14ec and it can be configured in Spark with: sparkConf.set(spark.hadoop.mapred.output.committer.class, classOf[DirectOutputCommitter].getName) On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid iras...@cloudera.com wrote: I'm not super familiar w/ S3, but I think the issue is that you want to use a different output committers with object stores, that don't have a simple move operation. There have been a few other threads on S3 outputcommitters. I think the most relevant for you is most probably this open JIRA: https://issues.apache.org/jira/browse/SPARK-6352 On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as a single node cluster for test. The data I use to sort is around 4GB and sit on S3, output will also on S3. I just connect spark-shell to the local cluster and run the code in the script (because I just want a benchmark now). My job is as simple as: val parquetFile = sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,) parquetFile.registerTempTable(Test) val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map { row = { row.mkString(\t) } } sortedResult.saveAsTextFile(s3n://myplace,); The job takes around 6 mins to finish the sort when I am monitoring the process. After I notice the process stop at: 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at console:31, took 581.304992 s At that time, the spark actually just write all the data to the _temporary folder first, after all sub-tasks finished, it will try to move all the ready result from _temporary folder to the final location. This process might be quick locally (because it will just be a cut/paste), but it looks like very slow on my S3, it takes a few second to move one file (usually there will be 200 partitions). And then it raise exceptions after it move might be 40-50 files. org.apache.http.NoHttpResponseException: The target server failed to respond at org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101) at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252) at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281) at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247) at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219) I try several times, but never get the full job finished. I am not sure anything wrong here, but I use something very basic and I can see the job has finished and all result on the S3 under temporary folder, but then it raise the exception and fail. Any special setting I should do here when deal with S3? I don’t know what is the issue here, I never see MapReduce has similar issue. So it could not be S3’s problem. Regards, Shuai
RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Below is the output: core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 1967947 max locked memory (kbytes, -l) 64 max memory size (kbytes, -m) unlimited open files (-n) 2024 pipe size(512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 8192 cpu time (seconds, -t) unlimited max user processes (-u) 1967947 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited I have set the max open file to 2024 by ulimit -n 2024, but same issue I am not sure whether it is a reasonable setting. Actually I am doing a loop, each time try to sort only 3GB data, it runs very quick in first loop, and slow down in second loop. At each time loop I start and destroy the context (because I want to clean up the temp file create under tmp folder, which take a lot of space). Just default setting. My logic: For loop: Val sc = new sc Sql = sc.loadParquet Sortbykey Sc.stop End And I run on the EC2 c3*8xlarge, Amazon Linux AMI 2014.09.2 (HVM). From: java8964 [mailto:java8...@hotmail.com] Sent: Friday, March 20, 2015 3:54 PM To: user@spark.apache.org Subject: RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1 Do you think the ulimit for the user running Spark on your nodes? Can you run ulimit -a under the user who is running spark on the executor node? Does the result make sense for the data you are trying to process? Yong _ From: szheng.c...@gmail.com To: user@spark.apache.org Subject: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1 Date: Fri, 20 Mar 2015 15:28:26 -0400 Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6 b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782 6 (Too many open files) And then I switch to: conf.set(spark.shuffle.consolidateFiles, true) .set(spark.shuffle.manager, SORT) Then I get the error: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly know the first issue is because Spark shuffle creates too many local temp files (and I don't know the solution, because looks like my solution also cause other issues), but I am not sure what means is the second error. Anyone knows the solution for both cases? Regards, Shuai
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6 b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782 6 (Too many open files) And then I switch to: conf.set(spark.shuffle.consolidateFiles, true) .set(spark.shuffle.manager, SORT) Then I get the error: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly know the first issue is because Spark shuffle creates too many local temp files (and I don't know the solution, because looks like my solution also cause other issues), but I am not sure what means is the second error. Anyone knows the solution for both cases? Regards, Shuai
[SPARK-3638 ] java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.
Hi All, I am running Spark 1.2.1 and AWS SDK. To make sure AWS compatible on the httpclient 4.2 (which I assume spark use?), I have already downgrade to the version 1.9.0 But even that, I still got an error: Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache /http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOpe rator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClien tConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClien tConnectionManager.java:99) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(C onnectionManagerFactory.java:29) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java :102) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:190) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:119) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:410) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:392) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:376) When I search the maillist, it looks the same issue as: https://github.com/apache/spark/pull/2535 http://stackoverflow.com/questions/24788949/nosuchmethoderror-while-running- aws-s3-client-on-spark-while-javap-shows-otherwi But I don't understand the solution mention here? The issue is caused by an pre-package DefaultClientConnectionOperator in the spark all-in-one jar file which doesn't have the that method. I have some questions here: How can we find out which exact version when spark try to pre-package everything (this really very painful). and how can we override it? I have tried: val conf = new SparkConf() .set(spark.files.userClassPathFirst, true)// For non Yarn APP before spark 1.3 .set(spark.executor.userClassPathFirst, true)// For spark 1.3.0 But it doesn't work This really create a lot of issues to me (especially we don't know what version is used by Spark to package its own jar, we need to try out). Even maven doesn't give enough information because httpclient is not under the maven dependency (even indirect dependency, after I use tools to resolved the whole dependency tree). Regards, Shuai
RE: Process time series RDD after sortByKey
Hi Imran, I am a bit confused here. Assume I have RDD a with 1000 partition and also has been sorted. How can I control when creating RDD b (with 20 partitions) to make sure 1-50 partition of RDD a map to 1st partition of RDD b? I don’t see any control code/logic here? You code below: val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) Does it means I need to define/develop my own MyGroupingRDD class? I am not very clear how to do that, any place I can find an example? I never create my own RDD class before (not RDD instance J). But this is very valuable approach to me so I am desired to learn. Regards, Shuai From: Imran Rashid [mailto:iras...@cloudera.com] Sent: Monday, March 16, 2015 11:22 AM To: Shawn Zheng; user@spark.apache.org Subject: Re: Process time series RDD after sortByKey Hi Shuai, On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng szheng.c...@gmail.com wrote: Sorry I response late. Zhan Zhang's solution is very interesting and I look at into it, but it is not what I want. Basically I want to run the job sequentially and also gain parallelism. So if possible, if I have 1000 partition, the best case is I can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150, etc. If we have ability to do this, we will gain huge flexibility when we try to process some time series like data and a lot of algo will benefit from it. yes, this is what I was suggesting you do. You would first create one RDD (a) that has 1000 partitions. Don't worry about the creation of this RDD -- it wont' create any tasks, its just a logical holder of your raw data. Then you create another RDD (b) that depends on your RDD (a), but that only has 20 partitions. Each partition in (b) would depend on a number of partitions from (a). As you've suggested, partition 1 in (b) would depend on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in (a), etc. Note that RDD (b) still doesn't *do* anything. Its just another logical holder for your data, but this time grouped in the way you want. Then after RDD (b), you would do whatever other transformations you wanted, but now you'd be working w/ 20 partitions: val rawData1000Partitions = sc.textFile(...) // or whatever val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc. note that this is almost exactly the same as what CoalescedRdd does. However, it might combine the partitions in whatever ways it feels like -- you want them combined in a very particular order. So you'll need to create your own subclass. Back to Zhan Zhang's while( iterPartition RDD.partitions.length) { val res = sc.runJob(this, (it: Iterator[T]) = somFunc, iterPartition, allowLocal = true) Some other function after processing one partition. iterPartition += 1 } I am curious how spark process this without parallelism, the indidivual partition will pass back to driver to process or just run one task on that node which partition exist? then follow by another partition on another node? Not exactly. The partition is not shipped back to the driver. You create a task which will be processed by a worker. The task scheduling will take data locality into account, so ideally the task will get scheduled in the same location where the data already resides. The worker will execute someFunc, and after its done it will ship the *result* back to the driver. Then the process will get repeated for all the other partitions. If you wanted all the data sent back to the driver, you could use RDD.toLocalIterator. That will send one partition back to the driver, let you process it on the driver, then fetch the next partition, etc. Imran
RE: [SPARK-3638 ] java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.
And it is an NoSuchMethodError, not a classnofound error And default I think the spark is only compile against Hadoop 2.2? For this issue itself, I just check the latest spark (1.3.0), its version can work (because it package with a newer version of httpclient, I can see the method is there, although still don’t know the exact version), but this doesn’t really solve the whole problem, it is very unclear that what version of third party library is used by Spark even there is someway to figure it out, still a horrible decision to do that? From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Monday, March 16, 2015 1:06 PM To: Shuai Zheng Cc: user Subject: Re: [SPARK-3638 ] java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator. From my local maven repo: $ jar tvf ~/.m2/repository/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar | grep SchemeRegistry 1373 Fri Apr 19 18:19:36 PDT 2013 org/apache/http/impl/conn/SchemeRegistryFactory.class 2954 Fri Apr 19 18:19:36 PDT 2013 org/apache/http/conn/scheme/SchemeRegistry.class 2936 Fri Apr 19 18:19:36 PDT 2013 org/apache/http/auth/AuthSchemeRegistry.class If you run mvn dependency:tree, you would see something similar to the following: [INFO] | +- org.apache.hadoop:hadoop-client:jar:2.6.0:compile [INFO] | | +- org.apache.hadoop:hadoop-common:jar:2.6.0:compile [INFO] | | | +- commons-cli:commons-cli:jar:1.2:compile [INFO] | | | +- xmlenc:xmlenc:jar:0.52:compile [INFO] | | | +- commons-io:commons-io:jar:2.4:compile [INFO] | | | +- commons-collections:commons-collections:jar:3.2.1:compile [INFO] | | | +- commons-lang:commons-lang:jar:2.6:compile [INFO] | | | +- commons-configuration:commons-configuration:jar:1.6:compile [INFO] | | | | +- commons-digester:commons-digester:jar:1.8:compile [INFO] | | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile [INFO] | | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile [INFO] | | | +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile [INFO] | | | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8:compile [INFO] | | | +- org.apache.avro:avro:jar:1.7.6:compile [INFO] | | | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile [INFO] | | | +- com.google.code.gson:gson:jar:2.2.4:compile [INFO] | | | +- org.apache.hadoop:hadoop-auth:jar:2.6.0:compile [INFO] | | | | +- org.apache.httpcomponents:httpclient:jar:4.2.5:compile Cheers On Mon, Mar 16, 2015 at 9:38 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I am running Spark 1.2.1 and AWS SDK. To make sure AWS compatible on the httpclient 4.2 (which I assume spark use?), I have already downgrade to the version 1.9.0 But even that, I still got an error: Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:29) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:102) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:190) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:119) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:410) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:392) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:376) When I search the maillist, it looks the same issue as: https://github.com/apache/spark/pull/2535 http://stackoverflow.com/questions/24788949/nosuchmethoderror-while-running-aws-s3-client-on-spark-while-javap-shows-otherwi But I don’t understand the solution mention here? The issue is caused by an pre-package DefaultClientConnectionOperator in the spark all-in-one jar file which doesn’t have the that method. I have some questions here: How can we find out which exact version when spark try to pre-package everything (this really very painful). and how can we override it? I have tried: val conf = new SparkConf() .set(spark.files.userClassPathFirst, true)// For non Yarn APP before spark 1.3 .set(spark.executor.userClassPathFirst, true)// For spark 1.3.0 But it doesn’t work This really create a lot of issues to me (especially we don’t know what version is used by Spark to package
sqlContext.parquetFile doesn't work with s3n in version 1.3.0
Hi All, I just upgrade the system to use version 1.3.0, but then the sqlContext.parquetFile doesn't work with s3n. I have test the same code with 1.2.1 and it works. A simple test running in spark-shell: val parquetFile = sqlContext.parquetFile(s3n:///test/2.parq ) java.lang.IllegalArgumentException: Wrong FS: s3n:///test/2.parq, expected: file:/// file:///\\ And same test work with spark-shell under 1.2.1 Regards, Shuai
RE: sqlContext.parquetFile doesn't work with s3n in version 1.3.0
I see, but this is really a. big issue. anyway for me to work around? I try to set the fs.default.name = s3n, but looks like it doesn't work. I must upgrade to 1.3.0 because I face the package incompatible issue in 1.2.1, and if I must patch something, I rather go with latest version. Regards, Shuai From: Kelly, Jonathan [mailto:jonat...@amazon.com] Sent: Monday, March 16, 2015 2:54 PM To: Shuai Zheng; user@spark.apache.org Subject: Re: sqlContext.parquetFile doesn't work with s3n in version 1.3.0 See https://issues.apache.org/jira/browse/SPARK-6351 ~ Jonathan From: Shuai Zheng szheng.c...@gmail.com Date: Monday, March 16, 2015 at 11:46 AM To: user@spark.apache.org user@spark.apache.org Subject: sqlContext.parquetFile doesn't work with s3n in version 1.3.0 Hi All, I just upgrade the system to use version 1.3.0, but then the sqlContext.parquetFile doesn't work with s3n. I have test the same code with 1.2.1 and it works. A simple test running in spark-shell: val parquetFile = sqlContext.parquetFile(s3n:///test/2.parq ) java.lang.IllegalArgumentException: Wrong FS: s3n:///test/2.parq, expected: file:/// file:///\\ And same test work with spark-shell under 1.2.1 Regards, Shuai
Spark will process _temporary folder on S3 is very slow and always cause failure
Hi All, I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as a single node cluster for test. The data I use to sort is around 4GB and sit on S3, output will also on S3. I just connect spark-shell to the local cluster and run the code in the script (because I just want a benchmark now). My job is as simple as: val parquetFile = sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3 n://...,s3n://...,) parquetFile.registerTempTable(Test) val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map { row = { row.mkString(\t) } } sortedResult.saveAsTextFile(s3n://myplace,); The job takes around 6 mins to finish the sort when I am monitoring the process. After I notice the process stop at: 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at console:31, took 581.304992 s At that time, the spark actually just write all the data to the _temporary folder first, after all sub-tasks finished, it will try to move all the ready result from _temporary folder to the final location. This process might be quick locally (because it will just be a cut/paste), but it looks like very slow on my S3, it takes a few second to move one file (usually there will be 200 partitions). And then it raise exceptions after it move might be 40-50 files. org.apache.http.NoHttpResponseException: The target server failed to respond at org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponsePar ser.java:101) at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.ja va:252) at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(Abst ractHttpClientConnection.java:281) at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(Defa ultClientConnection.java:247) at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(Ab stractClientConnAdapter.java:219) I try several times, but never get the full job finished. I am not sure anything wrong here, but I use something very basic and I can see the job has finished and all result on the S3 under temporary folder, but then it raise the exception and fail. Any special setting I should do here when deal with S3? I don't know what is the issue here, I never see MapReduce has similar issue. So it could not be S3's problem. Regards, Shuai
RE: Spark will process _temporary folder on S3 is very slow and always cause failure
And one thing forget to mention, even I have this exception and the result is not well format in my target folder (part of them are there, rest are under different folder structure of _tempoary folder). In the webUI of spark-shell, it is still be marked as successful step. I think this is a bug? Regards, Shuai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Friday, March 13, 2015 6:51 PM To: user@spark.apache.org Subject: Spark will process _temporary folder on S3 is very slow and always cause failure Hi All, I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as a single node cluster for test. The data I use to sort is around 4GB and sit on S3, output will also on S3. I just connect spark-shell to the local cluster and run the code in the script (because I just want a benchmark now). My job is as simple as: val parquetFile = sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3 n://...,s3n://...,) parquetFile.registerTempTable(Test) val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map { row = { row.mkString(\t) } } sortedResult.saveAsTextFile(s3n://myplace,); The job takes around 6 mins to finish the sort when I am monitoring the process. After I notice the process stop at: 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at console:31, took 581.304992 s At that time, the spark actually just write all the data to the _temporary folder first, after all sub-tasks finished, it will try to move all the ready result from _temporary folder to the final location. This process might be quick locally (because it will just be a cut/paste), but it looks like very slow on my S3, it takes a few second to move one file (usually there will be 200 partitions). And then it raise exceptions after it move might be 40-50 files. org.apache.http.NoHttpResponseException: The target server failed to respond at org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponsePar ser.java:101) at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.ja va:252) at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(Abst ractHttpClientConnection.java:281) at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(Defa ultClientConnection.java:247) at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(Ab stractClientConnAdapter.java:219) I try several times, but never get the full job finished. I am not sure anything wrong here, but I use something very basic and I can see the job has finished and all result on the S3 under temporary folder, but then it raise the exception and fail. Any special setting I should do here when deal with S3? I don't know what is the issue here, I never see MapReduce has similar issue. So it could not be S3's problem. Regards, Shuai
jar conflict with Spark default packaging
Hi All, I am running spark to deal with AWS. And aws sdk latest version is working with httpclient 3.4+. Then but spark-assembly-*-.jar file has packaged an old httpclient version which cause me: ClassNotFoundException for org/apache/http/client/methods/HttpPatch Even when I put the right httpclient jar there, it won't help because spark always take the class from same packaging first. I don't know why spark only provide a big package which doesn't allow us to customize the library loading sequence. I know I can just rebuild the spark, but this is very troublesome, and it should not be a general solution for long term (I can't rebuild spark jar every time when have a jar conflict as spark is supposed to be a cluster). In hadoop, we have mapreduce.job.user.classpath.first=true. But spark.yarn.user.classpath.first only work for Yarn. I think I am not the one who face this issue. Anyone has a more general solution for this? Regards, Shuai
How to pass parameter to spark-shell when choose client mode --master yarn-client
Hi All, I try to pass parameter to the spark-shell when I do some test: spark-shell --driver-memory 512M --executor-memory 4G --master spark://:7077 --conf spark.sql.parquet.compression.codec=snappy --conf spark.sql.parquet.binaryAsString=true This works fine on my local pc. And when I start EMR, and pass the similar things: ~/spark/bin/spark-shell --executor-memory 40G --master yarn-client --num-executors 1 --executor-cores 32 --conf spark.sql.parquet.compression.codec=snappy --conf spark.sql.parquet.binaryAsString=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer The parameter doesn't pass to spark-shell. Anyone knows why? And what is the alternatives for me to do that? Regards, Shuai
Read Parquet file from scala directly
Hi All, I have a lot of parquet files, and I try to open them directly instead of load them into RDD in driver (so I can optimize some performance through special logic). But I do some research online and can't find any example to access parquet directly from scala, anyone has done this before? Regards, Shuai
How to preserve/preset partition information when load time series data?
Hi All, If I have a set of time series data files, they are in parquet format and the data for each day are store in naming convention, but I will not know how many files for one day. 20150101a.parq 20150101b.parq 20150102a.parq 20150102b.parq 20150102c.parq . 201501010a.parq . Now I try to write a program to process the data. And I want to make sure each day's data into one partition, of course I can load all into one big RDD to do partition but it will be very slow. As I already know the time series of the file name, is it possible for me to load the data into the RDD also preserve the partition? I know I can preserve the partition by each file, but is it anyway for me to load the RDD and preserve partition based on a set of files: one partition multiple files? I think it is possible because when I load a RDD from 100 files (assume cross 100 days), I will have 100 partitions (if I disable file split when load file). Then I can use a special coalesce to repartition the RDD? But I don't know is it possible to do that in current Spark now? Regards, Shuai
Process time series RDD after sortByKey
Hi All, I am processing some time series data. For one day, it might has 500GB, then for each hour, it is around 20GB data. I need to sort the data before I start process. Assume I can sort them successfully dayRDD.sortByKey but after that, I might have thousands of partitions (to make the sort successfully), might be 1000 partitions. And then I try to process the data by hour (not need exactly one hour, but some kind of similar time frame). And I can't just re-partition size to 24 because then one partition might be too big to fit into memory (if it is 20GB). So is there any way for me to just can process underlying partitions by certain order? Basically I want to call mapPartitionsWithIndex with a range of index? Anyway to do it? Hope I describe my issue clear. J Regards, Shuai
RE: Union and reduceByKey will trigger shuffle even same partition?
Hi Imran, I will say your explanation is extremely helpful J I tested some ideas according to your explanation and it make perfect sense to me. I modify my code to use cogroup+mapValues instead of union+reduceByKey to preserve the partition, which gives me more than 100% performance gain (for the loop part). Thanks a lot! And I am curious will there any easy way for me to get a detail DAG execution plan description without running the code? Just as explain command in pig or sql? Shuai From: Imran Rashid [mailto:iras...@cloudera.com] Sent: Monday, February 23, 2015 6:00 PM To: Shuai Zheng Cc: Shao, Saisai; user@spark.apache.org Subject: Re: Union and reduceByKey will trigger shuffle even same partition? I think you're getting tripped up lazy evaluation and the way stage boundaries work (admittedly its pretty confusing in this case). It is true that up until recently, if you unioned two RDDs with the same partitioner, the result did not have the same partitioner. But that was just fixed here: https://github.com/apache/spark/pull/4629 That does mean that after you update ranks, it will no longer have a partitioner, which will effect the join on your second iteration here: val contributions = links.join(ranks).flatMap But, I think most of the shuffles you are pointing to are a different issue. I may be belaboring something you already know, but I think this is easily confusing. I think the first thing is understanding where you get stage boundaries, and how they are named. Each shuffle introduces a stage boundary. However, the stages get named by the last thing in a stage, which is not really what is always causing the shuffle. Eg., reduceByKey() causes a shuffle, but we don't see that in a stage name. Similarly, map() does not cause a shuffle, but we see a stage with that name. So, what do the stage boundaries we see actually correspond to? 1) map -- that is doing the shuffle write for the following groupByKey 2) groupByKey -- in addition to reading the shuffle output from your map, this is *also* doing the shuffle write for the next shuffle you introduce w/ partitionBy 3) union -- this is doing the shuffle reading from your partitionBy, and then all the work from there right up until the shuffle write for what is immediatley after union -- your reduceByKey. 4) lookup is an action, which is why that has another stage. a couple of things to note: (a) your join does not cause a shuffle, b/c both rdds share a partitioner (b) you have two shuffles from groupByKey followed by partitionBy -- you really probably want the 1 arg form of groupByKey(partitioner) hopefully this is helpful to understand how your stages shuffles correspond to your code. Imran On Mon, Feb 23, 2015 at 3:35 PM, Shuai Zheng szheng.c...@gmail.com wrote: This also trigger an interesting question: how can I do this locally by code if I want. For example: I have RDD A and B, which has some partition, then if I want to join A to B, I might just want to do a mapper side join (although B itself might be big, but B’s local partition is known small enough put in memory), how can I access other RDD’s local partition in the mapParitition method? Is it anyway to do this in Spark? From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Monday, February 23, 2015 3:13 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: RE: Union and reduceByKey will trigger shuffle even same partition? If you call reduceByKey(), internally Spark will introduce a shuffle operations, not matter the data is already partitioned locally, Spark itself do not know the data is already well partitioned. So if you want to avoid Shuffle, you have to write the code explicitly to avoid this, from my understanding. You can call mapParitition to get a partition of data and reduce by key locally by your logic. Thanks Saisai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, February 23, 2015 12:00 PM To: user@spark.apache.org Subject: Union and reduceByKey will trigger shuffle even same partition? Hi All, I am running a simple page rank program, but it is slow. And I dig out part of reason is there is shuffle happen when I call an union action even both RDD share the same partition: Below is my test code in spark shell: import org.apache.spark.HashPartitioner sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) val beta = 0.8 val numOfPartition = 6 val links = sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val part=line.split(\t); (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new HashPartitioner(numOfPartition)).persist var ranks = links.mapValues(_ = 1.0) var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist for (i - 1 until 2) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank
RE: Union and reduceByKey will trigger shuffle even same partition?
This also trigger an interesting question: how can I do this locally by code if I want. For example: I have RDD A and B, which has some partition, then if I want to join A to B, I might just want to do a mapper side join (although B itself might be big, but B's local partition is known small enough put in memory), how can I access other RDD's local partition in the mapParitition method? Is it anyway to do this in Spark? From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Monday, February 23, 2015 3:13 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: RE: Union and reduceByKey will trigger shuffle even same partition? If you call reduceByKey(), internally Spark will introduce a shuffle operations, not matter the data is already partitioned locally, Spark itself do not know the data is already well partitioned. So if you want to avoid Shuffle, you have to write the code explicitly to avoid this, from my understanding. You can call mapParitition to get a partition of data and reduce by key locally by your logic. Thanks Saisai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, February 23, 2015 12:00 PM To: user@spark.apache.org Subject: Union and reduceByKey will trigger shuffle even same partition? Hi All, I am running a simple page rank program, but it is slow. And I dig out part of reason is there is shuffle happen when I call an union action even both RDD share the same partition: Below is my test code in spark shell: import org.apache.spark.HashPartitioner sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) val beta = 0.8 val numOfPartition = 6 val links = sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line= {val part=line.split(\t); (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new HashPartitioner(numOfPartition)).persist var ranks = links.mapValues(_ = 1.0) var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist for (i - 1 until 2) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) = links.map(dest = (dest, rank / links.size * beta)) } ranks = contributions.union(leakedMatrix).reduceByKey(_ + _) } ranks.lookup(1) In above code, links will join ranks and should preserve the partition, and leakedMatrix also share the same partition, so I expect there is no shuffle happen on the contributions.union(leakedMatrix), also on the coming reduceByKey after that. But finally there is shuffle write for all steps, map, groupByKey, Union, partitionBy, etc. I expect there should only happen once on the shuffle then all should local operation, but the screen shows not, do I have any misunderstanding here?
Union and reduceByKey will trigger shuffle even same partition?
Hi All, I am running a simple page rank program, but it is slow. And I dig out part of reason is there is shuffle happen when I call an union action even both RDD share the same partition: Below is my test code in spark shell: import org.apache.spark.HashPartitioner sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) val beta = 0.8 val numOfPartition = 6 val links = sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line= {val part=line.split(\t); (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new HashPartitioner(numOfPartition)).persist var ranks = links.mapValues(_ = 1.0) var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist for (i - 1 until 2) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) = links.map(dest = (dest, rank / links.size * beta)) } ranks = contributions.union(leakedMatrix).reduceByKey(_ + _) } ranks.lookup(1) In above code, links will join ranks and should preserve the partition, and leakedMatrix also share the same partition, so I expect there is no shuffle happen on the contributions.union(leakedMatrix), also on the coming reduceByKey after that. But finally there is shuffle write for all steps, map, groupByKey, Union, partitionBy, etc. I expect there should only happen once on the shuffle then all should local operation, but the screen shows not, do I have any misunderstanding here?
RE: Union and reduceByKey will trigger shuffle even same partition?
In the book of learning spark: So here it means only no shuffle happen crossing network but still will do shuffle locally? Even it is the case, why union will trigger shuffle? I think union will only just append the RDD together. From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Monday, February 23, 2015 3:13 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: RE: Union and reduceByKey will trigger shuffle even same partition? If you call reduceByKey(), internally Spark will introduce a shuffle operations, not matter the data is already partitioned locally, Spark itself do not know the data is already well partitioned. So if you want to avoid Shuffle, you have to write the code explicitly to avoid this, from my understanding. You can call mapParitition to get a partition of data and reduce by key locally by your logic. Thanks Saisai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, February 23, 2015 12:00 PM To: user@spark.apache.org Subject: Union and reduceByKey will trigger shuffle even same partition? Hi All, I am running a simple page rank program, but it is slow. And I dig out part of reason is there is shuffle happen when I call an union action even both RDD share the same partition: Below is my test code in spark shell: import org.apache.spark.HashPartitioner sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) val beta = 0.8 val numOfPartition = 6 val links = sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line= {val part=line.split(\t); (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new HashPartitioner(numOfPartition)).persist var ranks = links.mapValues(_ = 1.0) var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist for (i - 1 until 2) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) = links.map(dest = (dest, rank / links.size * beta)) } ranks = contributions.union(leakedMatrix).reduceByKey(_ + _) } ranks.lookup(1) In above code, links will join ranks and should preserve the partition, and leakedMatrix also share the same partition, so I expect there is no shuffle happen on the contributions.union(leakedMatrix), also on the coming reduceByKey after that. But finally there is shuffle write for all steps, map, groupByKey, Union, partitionBy, etc. I expect there should only happen once on the shuffle then all should local operation, but the screen shows not, do I have any misunderstanding here?
RE: How to design a long live spark application
Thanks. I think about it, yes, the DAG engine should not have issue to build the right graph in different threads (at least in theory, it is not an issue). So now I have another question: if I have a context initiated, but there is no operation on it for very long time, will there a timeout on it? How Spark to control/maintain/detect the live of the client spark context? Do I need to setup something special? Regards, Shuai From: Eugen Cepoi [mailto:cepoi.eu...@gmail.com] Sent: Thursday, February 05, 2015 5:39 PM To: Shuai Zheng Cc: Corey Nolet; Charles Feduke; user@spark.apache.org Subject: Re: How to design a long live spark application Yes you can submit multiple actions from different threads to the same SparkContext. It is safe. Indeed what you want to achieve is quite common. Expose some operations over a SparkContext through HTTP. I have used spray for this and it just worked fine. At bootstrap of your web app, start a sparkcontext, maybe preprocess some data and cache it, then start accepting requests against this sc. Depending where you place the initialization code, you can block the server from initializing until your context is ready. This is nice if you don't want to accept requests while the context is being prepared. Eugen 2015-02-05 23:22 GMT+01:00 Shuai Zheng szheng.c...@gmail.com: This example helps a lot J But I am thinking a below case: Assume I have a SparkContext as a global variable. Then if I use multiple threads to access/use it. Will it mess up? For example: My code: public static ListTuple2Integer, Double run(JavaSparkContext sparkContext, MapInteger, ListExposureInfo cache, Properties prop, ListEghInfo el) throws IOException, InterruptedException { JavaRDDEghInfo lines = sparkContext.parallelize(el, 100); Lines.map(…) … Lines.count() } If I have two threads call this method at the same time and pass in the same SparkContext. Will SparkContext be thread-safe? I am a bit worry here, in traditional java, it should be, but in Spark context, I am not 100% sure. Basically the sparkContext need to smart enough to differentiate the different method context (RDD add to it from different methods), so create two different DAG for different method. Anyone can confirm this? This is not something I can easily test with code. Thanks! Regards, Shuai From: Corey Nolet [mailto:cjno...@gmail.com] Sent: Thursday, February 05, 2015 11:55 AM To: Charles Feduke Cc: Shuai Zheng; user@spark.apache.org Subject: Re: How to design a long live spark application Here's another lightweight example of running a SparkContext in a common java servlet container: https://github.com/calrissian/spark-jetty-server On Thu, Feb 5, 2015 at 11:46 AM, Charles Feduke charles.fed...@gmail.com wrote: If you want to design something like Spark shell have a look at: http://zeppelin-project.org/ Its open source and may already do what you need. If not, its source code will be helpful in answering the questions about how to integrate with long running jobs that you have. On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com wrote: You can check out https://github.com/spark-jobserver/spark-jobserver - this allows several users to upload their jars and run jobs with a REST interface. However, if all users are using the same functionality, you can write a simple spray server which will act as the driver and hosts the spark context+RDDs, launched in client mode. On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I want to develop a server side application: User submit request à Server run spark application and return (this might take a few seconds). So I want to host the server to keep the long-live context, I don’t know whether this is reasonable or not. Basically I try to have a global JavaSparkContext instance and keep it there, and initialize some RDD. Then my java application will use it to submit the job. So now I have some questions: 1, if I don’t close it, will there any timeout I need to configure on the spark server? 2, In theory I want to design something similar to Spark shell (which also host a default sc there), just it is not shell based. Any suggestion? I think my request is very common for application development, here must someone has done it before? Regards, Shawn
How to design a long live spark application
Hi All, I want to develop a server side application: User submit request à Server run spark application and return (this might take a few seconds). So I want to host the server to keep the long-live context, I dont know whether this is reasonable or not. Basically I try to have a global JavaSparkContext instance and keep it there, and initialize some RDD. Then my java application will use it to submit the job. So now I have some questions: 1, if I dont close it, will there any timeout I need to configure on the spark server? 2, In theory I want to design something similar to Spark shell (which also host a default sc there), just it is not shell based. Any suggestion? I think my request is very common for application development, here must someone has done it before? Regards, Shawn
Use Spark as multi-threading library and deprecate web UI
Hi All, It might sounds weird, but I think spark is perfect to be used as a multi-threading library in some cases. The local mode will naturally boost multiple thread when required. Because it is more restrict and less chance to have potential bug in the code (because it is more data oriental, not thread oriental). Of course, it cannot be used for all cases, but in most of my applications, it is enough (90%). I want to hear other people's idea about this. BTW: if I run spark in local mode, how to deprecate the web UI (default listen on 4040), because I don't want to start the UI every time if I use spark as a local library. Regards, Shuai
RE: How to design a long live spark application
This example helps a lot J But I am thinking a below case: Assume I have a SparkContext as a global variable. Then if I use multiple threads to access/use it. Will it mess up? For example: My code: public static ListTuple2Integer, Double run(JavaSparkContext sparkContext, MapInteger, ListExposureInfo cache, Properties prop, ListEghInfo el) throws IOException, InterruptedException { JavaRDDEghInfo lines = sparkContext.parallelize(el, 100); Lines.map(…) … Lines.count() } If I have two threads call this method at the same time and pass in the same SparkContext. Will SparkContext be thread-safe? I am a bit worry here, in traditional java, it should be, but in Spark context, I am not 100% sure. Basically the sparkContext need to smart enough to differentiate the different method context (RDD add to it from different methods), so create two different DAG for different method. Anyone can confirm this? This is not something I can easily test with code. Thanks! Regards, Shuai From: Corey Nolet [mailto:cjno...@gmail.com] Sent: Thursday, February 05, 2015 11:55 AM To: Charles Feduke Cc: Shuai Zheng; user@spark.apache.org Subject: Re: How to design a long live spark application Here's another lightweight example of running a SparkContext in a common java servlet container: https://github.com/calrissian/spark-jetty-server On Thu, Feb 5, 2015 at 11:46 AM, Charles Feduke charles.fed...@gmail.com wrote: If you want to design something like Spark shell have a look at: http://zeppelin-project.org/ Its open source and may already do what you need. If not, its source code will be helpful in answering the questions about how to integrate with long running jobs that you have. On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com wrote: You can check out https://github.com/spark-jobserver/spark-jobserver - this allows several users to upload their jars and run jobs with a REST interface. However, if all users are using the same functionality, you can write a simple spray server which will act as the driver and hosts the spark context+RDDs, launched in client mode. On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I want to develop a server side application: User submit request à Server run spark application and return (this might take a few seconds). So I want to host the server to keep the long-live context, I don’t know whether this is reasonable or not. Basically I try to have a global JavaSparkContext instance and keep it there, and initialize some RDD. Then my java application will use it to submit the job. So now I have some questions: 1, if I don’t close it, will there any timeout I need to configure on the spark server? 2, In theory I want to design something similar to Spark shell (which also host a default sc there), just it is not shell based. Any suggestion? I think my request is very common for application development, here must someone has done it before? Regards, Shawn
RE: Use Spark as multi-threading library and deprecate web UI
Nice. I just try and it works. Thanks very much! And I notice there is below in the log: 15/02/05 11:19:09 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@NY02913D.global.local:8162] 15/02/05 11:19:10 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@NY02913D.global.local:8162/user/HeartbeatReceiver As I understand. The local mode will have driver and executors in the same java process. So is there any way for me to also disable above two listeners? Or they are not optional even in local mode? Regards, Shuai -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, February 05, 2015 10:53 AM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Use Spark as multi-threading library and deprecate web UI Do you mean disable the web UI? spark.ui.enabled=false Sure, it's useful with master = local[*] too. On Thu, Feb 5, 2015 at 9:30 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, It might sounds weird, but I think spark is perfect to be used as a multi-threading library in some cases. The local mode will naturally boost multiple thread when required. Because it is more restrict and less chance to have potential bug in the code (because it is more data oriental, not thread oriental). Of course, it cannot be used for all cases, but in most of my applications, it is enough (90%). I want to hear other people’s idea about this. BTW: if I run spark in local mode, how to deprecate the web UI (default listen on 4040), because I don’t want to start the UI every time if I use spark as a local library. Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Executor vs Mapper in Hadoop
Thanks a lot for clarify it. Then following there are some questions: 1, if normally we have 1 executor per machine. Then if we have a cluster with different hardware capacity, for example: one 8 core worker and one 4 core worker (ignore the driver machine), then if we set executor-cores =4, then for 8 core worker there will have 2 executors running, am I right? if we set the executor-cores=8, then one worker (with 4 core) can't start any executors. And similar idea will also apply to the memory allocation, so we should always use same hardware configuration for spark cluster as worker machine? 2, If I run spark on Yarn (actually EMR), where can I check/configure the default executor-cores? Regards, Shuai On Thu, Jan 15, 2015 at 11:13 PM, Sean Owen so...@cloudera.com wrote: An executor is specific to a Spark application, just as a mapper is specific to a MapReduce job. So a machine will usually be running many executors, and each is a JVM. A Mapper is single-threaded; an executor can run many tasks (possibly from different jobs within the application) at once. Yes, 5 executors with 4 cores should be able to process 20 tasks in parallel. In any normal case, you have 1 executor per machine per application. There are cases where you would make more than 1, but these are unusual. On Thu, Jan 15, 2015 at 8:16 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I try to clarify some behavior in the spark for executor. Because I am from Hadoop background, so I try to compare it to the Mapper (or reducer) in hadoop. 1, Each node can have multiple executors, each run in its own process? This is same as mapper process. 2, I thought the spark executor will use multi-thread mode when there are more than 1 core to allocate to it (for example: set executor-cores to 5). In this way, how many partition it can process? For example, if input are 20 partitions (similar as 20 split as mapper input) and we have 5 executors, each has 4 cores. Will all these partitions will be proceed as the same time (so each core process one partition) or actually one executor can only run one partition at the same time? I don’t know whether my understand is correct, please suggest. BTW: In general practice, should we always try to set the executor-cores to a higher number? So we will favor 10 cores * 2 executor than 2 cores*10 executors? Any suggestion here? Thanks! Regards, Shuai
RE: Determine number of running executors
Hi Tobias, Can you share more information about how do you do that? I also have similar question about this. Thanks a lot, Regards, Shuai From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Wednesday, November 26, 2014 12:25 AM To: Sandy Ryza Cc: Yanbo Liang; user Subject: Re: Determine number of running executors Hi, Thanks for your help! Sandy, I had a bit of trouble finding the spark.executor.cores property. (It wasn't there although its value should have been 2.) I ended up throwing regular expressions on scala.util.Properties.propOrElse(sun.java.command, ), which worked surprisingly well ;-) Thanks Tobias
Executor parameter doesn't work for Spark-shell on EMR Yarn
Hi All, I am testing Spark on EMR cluster. Env is a one node cluster r3.8xlarge. Has 32 vCore and 244G memory. But the command line I use to start up spark-shell, it can't work. For example: ~/spark/bin/spark-shell --jars /home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/*.jar --num-executors 6 --executor-memory 10G Neither num-executors nor memory setup works. And more interesting, if I use test code: val lines = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75, -240990|161324,9051480,0,2,30.48,75)) var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum It will start 32 executors (then I assume it try to start all executors for every vCore). But if I use some real data to do it (the file size is 200M): val lines = sc.textFile(s3://.../part-r-0) var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum It will only start 4 executors, which map to the number of HDFS split (200M will have 4 splits). So I have two questions: 1, Why the setup parameter is ignored by Yarn? How can I limit the number of executors I can run? 2, Why my much smaller test data set will trigger 32 executors but my real 200M data set will only have 4 executors? So how should I control the executor setup on the spark-shell? And I print the sparkConf, it looks like much less than I expect, and I don't see my pass in parameter show there. scala sc.getConf.getAll.foreach(println) (spark.tachyonStore.folderName,spark-af0c4d42-fe4d-40b0-a3cf-25b6a9e16fa0) (spark.app.id,local-1421353031552) (spark.eventLog.enabled,true) (spark.executor.id,driver) (spark.repl.class.uri,http://10.181.82.38:58415) (spark.driver.host,ip-10-181-82-38.ec2.internal) (spark.executor.extraJavaOptions,-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70) (spark.app.name,Spark shell) (spark.fileserver.uri,http://10.181.82.38:54666) (spark.jars,file:/home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/aws-java-sdk -1.9.14.jar) (spark.eventLog.dir,hdfs:///spark-logs) (spark.executor.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hado op/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hado op/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar) (spark.master,local[*]) (spark.driver.port,54191) (spark.driver.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hadoop /spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop /.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar) I search the old threads, attached email answer the question about why vCore setup doesn't work. But I think this is not same issue as me. Otherwise then default Yarn Spark setup can't do any adjustment? Regards, Shuai ---BeginMessage--- If you are using capacity scheduler in yarn: By default yarn capacity scheduler uses DefaultResourceCalculator. DefaultResourceCalculator consider¹s only memory while allocating contains. You can use DominantResourceCalculator, it considers memory and cpu. In capacity-scheduler.xml set yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.res ource.DefaultResourceCalculator On 04/11/14 3:03 am, Gen gen.tan...@gmail.com wrote: Hi, Well, I doesn't find original documentation, but according to http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage , the vcores is not for physics cpu core but for virtual cores. And I used top command to monitor the cpu utilization during the spark task. The spark can use all cpu even I leave --executor-cores as default(1). Hope that it can be a help. Cheers Gen Gen wrote Hi, Maybe it is a stupid question, but I am running spark on yarn. I request the resources by the following command: {code} ./spark-submit --master yarn-client --num-executors #number of worker --executor-cores #number of cores. ... {code} However, after launching the task, I use / yarn node -status ID / to monitor the situation of cluster. It shows that the number of Vcores used for each container is always 1 no matter what number I pass by --executor-cores. Any ideas how to solve this problem? Thanks a lot in advance for your help. Cheers Gen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/executor-cores-cannot- change-vcores-in-yarn-tp17883p17992.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org ---End Message---
Executor vs Mapper in Hadoop
Hi All, I try to clarify some behavior in the spark for executor. Because I am from Hadoop background, so I try to compare it to the Mapper (or reducer) in hadoop. 1, Each node can have multiple executors, each run in its own process? This is same as mapper process. 2, I thought the spark executor will use multi-thread mode when there are more than 1 core to allocate to it (for example: set executor-cores to 5). In this way, how many partition it can process? For example, if input are 20 partitions (similar as 20 split as mapper input) and we have 5 executors, each has 4 cores. Will all these partitions will be proceed as the same time (so each core process one partition) or actually one executor can only run one partition at the same time? I don't know whether my understand is correct, please suggest. BTW: In general practice, should we always try to set the executor-cores to a higher number? So we will favor 10 cores * 2 executor than 2 cores*10 executors? Any suggestion here? Thanks! Regards, Shuai
RE: Executor parameter doesn't work for Spark-shell on EMR Yarn
I figure out the second question, because if I don't pass in the num of partition for the test data, it will by default assume has max executors (although I don't know what is this default max num). val lines = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75, -240990|161324,9051480,0,2,30.48,75),2) will only trigger 2 executors. So I think the default executors num will be decided by the first RDD operation need to send to executors. This give me a weird way to control the num of executors (a fake/test code piece run to kick off the executors first, then run the real behavior - because executor will run the whole lifecycle of the applications? Although this may not have any real value in practice J But I still need help for my first question. Thanks a lot. Regards, Shuai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Thursday, January 15, 2015 4:03 PM To: user@spark.apache.org Subject: RE: Executor parameter doesn't work for Spark-shell on EMR Yarn Forget to mention, I use EMR AMI 3.3.1, Spark 1.2.0. Yarn 2.4. The spark is setup by the standard script: s3://support.elasticmapreduce/spark/install-spark From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Thursday, January 15, 2015 3:52 PM To: user@spark.apache.org Subject: Executor parameter doesn't work for Spark-shell on EMR Yarn Hi All, I am testing Spark on EMR cluster. Env is a one node cluster r3.8xlarge. Has 32 vCore and 244G memory. But the command line I use to start up spark-shell, it can't work. For example: ~/spark/bin/spark-shell --jars /home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/*.jar --num-executors 6 --executor-memory 10G Neither num-executors nor memory setup works. And more interesting, if I use test code: val lines = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75, -240990|161324,9051480,0,2,30.48,75)) var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum It will start 32 executors (then I assume it try to start all executors for every vCore). But if I use some real data to do it (the file size is 200M): val lines = sc.textFile(s3://.../part-r-0) var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum It will only start 4 executors, which map to the number of HDFS split (200M will have 4 splits). So I have two questions: 1, Why the setup parameter is ignored by Yarn? How can I limit the number of executors I can run? 2, Why my much smaller test data set will trigger 32 executors but my real 200M data set will only have 4 executors? So how should I control the executor setup on the spark-shell? And I print the sparkConf, it looks like much less than I expect, and I don't see my pass in parameter show there. scala sc.getConf.getAll.foreach(println) (spark.tachyonStore.folderName,spark-af0c4d42-fe4d-40b0-a3cf-25b6a9e16fa0) (spark.app.id,local-1421353031552) (spark.eventLog.enabled,true) (spark.executor.id,driver) (spark.repl.class.uri,http://10.181.82.38:58415) (spark.driver.host,ip-10-181-82-38.ec2.internal) (spark.executor.extraJavaOptions,-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70) (spark.app.name,Spark shell) (spark.fileserver.uri,http://10.181.82.38:54666) (spark.jars,file:/home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/aws-java-sdk -1.9.14.jar) (spark.eventLog.dir,hdfs:///spark-logs) (spark.executor.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hado op/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hado op/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar) (spark.master,local[*]) (spark.driver.port,54191) (spark.driver.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hadoop /spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop /.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar) I search the old threads, attached email answer the question about why vCore setup doesn't work. But I think this is not same issue as me. Otherwise then default Yarn Spark setup can't do any adjustment? Regards, Shuai
RE: Why always spilling to disk and how to improve it?
Thanks a lot! I just realize the spark is not a really in-memory version of mapreduce J From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Tuesday, January 13, 2015 3:53 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Why always spilling to disk and how to improve it? You could try setting the following to tweak the application a little bit: .set(spark.rdd.compress,true) .set(spark.storage.memoryFraction, 1) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) For shuffle behavior, you can look at this document https://spark.apache.org/docs/1.1.0/configuration.html#shuffle-behavior Thanks Best Regards On Wed, Jan 14, 2015 at 1:51 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I am trying with some small data set. It is only 200m, and what I am doing is just do a distinct count on it. But there are a lot of spilling happen in the log (I attached in the end of the email). Basically I use 10G memory, run on a one-node EMR cluster with r3*8xlarge instance type (which has 244G memory and 32 vCPU). My code is simple, run in the spark-shell (~/spark/bin/spark-shell --executor-cores 4 --executor-memory 10G) val llg = sc.textFile(s3://…/part-r-0) // File is around 210.5M, 4.7M rows inside //val llg = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75, -240990|161324,9051480,0,2,30.48,75)) val ids = llg.flatMap(line = line.split(,).slice(0,1)) //Try to get the first column as key val counts = ids.distinct.count I think I should have enough memory, so there should not have any spilling happen. Anyone can give me some idea why or where I can tuning the system to reduce the spilling (it is not an issue on this dataset, but I want to see how to tuning it up). The Spark UI shows only 24.2MB on the shuffle write. And if I have 10G memory for executor, why it need to spill. 2015-01-13 20:01:53,010 INFO [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block broadcast_2_piece0 2015-01-13 20:01:53,011 INFO [Spark Context Cleaner] spark.ContextCleaner (Logging.scala:logInfo(59)) - Cleaned broadcast 2 2015-01-13 20:01:53,399 INFO [Executor task launch worker-5] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 149 spilling in-memory map of 23.4 MB to disk (3 times so far) 2015-01-13 20:01:53,516 INFO [Executor task launch worker-7] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 151 spilling in-memory map of 23.4 MB to disk (3 times so far) 2015-01-13 20:01:53,531 INFO [Executor task launch worker-6] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 150 spilling in-memory map of 23.2 MB to disk (3 times so far) 2015-01-13 20:01:53,793 INFO [Executor task launch worker-4] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 148 spilling in-memory map of 23.4 MB to disk (3 times so far) 2015-01-13 20:01:54,460 INFO [Executor task launch worker-5] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 149 spilling in-memory map of 23.2 MB to disk (4 times so far) 2015-01-13 20:01:54,469 INFO [Executor task launch worker-7] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 151 spilling in-memory map of 23.2 MB to disk (4 times so far) 2015-01-13 20:01:55,144 INFO [Executor task launch worker-6] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 150 spilling in-memory map of 24.2 MB to disk (4 times so far) 2015-01-13 20:01:55,192 INFO [Executor task launch worker-4] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 148 spilling in-memory map of 23.2 MB to disk (4 times so far) I am trying to collect more benchmark for next step bigger dataset and more complex logic. Regards, Shuai
Why always spilling to disk and how to improve it?
Hi All, I am trying with some small data set. It is only 200m, and what I am doing is just do a distinct count on it. But there are a lot of spilling happen in the log (I attached in the end of the email). Basically I use 10G memory, run on a one-node EMR cluster with r3*8xlarge instance type (which has 244G memory and 32 vCPU). My code is simple, run in the spark-shell (~/spark/bin/spark-shell --executor-cores 4 --executor-memory 10G) val llg = sc.textFile(s3://./part-r-0) // File is around 210.5M, 4.7M rows inside //val llg = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75, -240990|161324,9051480,0,2,30.48,75)) val ids = llg.flatMap(line = line.split(,).slice(0,1)) //Try to get the first column as key val counts = ids.distinct.count I think I should have enough memory, so there should not have any spilling happen. Anyone can give me some idea why or where I can tuning the system to reduce the spilling (it is not an issue on this dataset, but I want to see how to tuning it up). The Spark UI shows only 24.2MB on the shuffle write. And if I have 10G memory for executor, why it need to spill. 2015-01-13 20:01:53,010 INFO [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block broadcast_2_piece0 2015-01-13 20:01:53,011 INFO [Spark Context Cleaner] spark.ContextCleaner (Logging.scala:logInfo(59)) - Cleaned broadcast 2 2015-01-13 20:01:53,399 INFO [Executor task launch worker-5] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 149 spilling in-memory map of 23.4 MB to disk (3 times so far) 2015-01-13 20:01:53,516 INFO [Executor task launch worker-7] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 151 spilling in-memory map of 23.4 MB to disk (3 times so far) 2015-01-13 20:01:53,531 INFO [Executor task launch worker-6] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 150 spilling in-memory map of 23.2 MB to disk (3 times so far) 2015-01-13 20:01:53,793 INFO [Executor task launch worker-4] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 148 spilling in-memory map of 23.4 MB to disk (3 times so far) 2015-01-13 20:01:54,460 INFO [Executor task launch worker-5] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 149 spilling in-memory map of 23.2 MB to disk (4 times so far) 2015-01-13 20:01:54,469 INFO [Executor task launch worker-7] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 151 spilling in-memory map of 23.2 MB to disk (4 times so far) 2015-01-13 20:01:55,144 INFO [Executor task launch worker-6] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 150 spilling in-memory map of 24.2 MB to disk (4 times so far) 2015-01-13 20:01:55,192 INFO [Executor task launch worker-4] collection.ExternalAppendOnlyMap (Logging.scala:logInfo(59)) - Thread 148 spilling in-memory map of 23.2 MB to disk (4 times so far) I am trying to collect more benchmark for next step bigger dataset and more complex logic. Regards, Shuai
Re: S3 files , Spark job hungsup
Is it possible too many connections open to read from s3 from one node? I have this issue before because I open a few hundreds of files on s3 to read from one node. It just block itself without error until timeout later. On Monday, December 22, 2014, durga durgak...@gmail.com wrote: Hi All, I am facing a strange issue sporadically. occasionally my spark job is hungup on reading s3 files. It is not throwing exception . or making some progress, it is just hungs up there. Is this a known issue , Please let me know how could I solve this issue. Thanks, -D -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/S3-files-Spark-job-hungsup-tp20806.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:;
Network file input cannot be recognized?
Hi, I am running a code which takes a network file (not HDFS) location as input. But sc.textFile(networklocation\\README.md) can't recognize the network location start with as a valid location, because it only accept HDFS and local like file name format? Anyone has idea how can I use a network file location as input for create RDD? Regards, Shuai
Find the file info of when load the data into RDD
Hi All, When I try to load a folder into the RDDs, any way for me to find the input file name of particular partitions? So I can track partitions from which file. In the hadoop, I can find this information through the code: FileSplit fileSplit = (FileSplit) context.getInputSplit(); String strFilename = fileSplit.getPath().getName(); But how can I do this in spark? Regards, Shuai
Re: Find the file info of when load the data into RDD
I just found a possible answer: http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/ Will give a try on it. Although it is a bit troublesome, but if it works, will give what I want. Sorry for bother everyone here Regards, Shuai On Sun, Dec 21, 2014 at 4:43 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, When I try to load a folder into the RDDs, any way for me to find the input file name of particular partitions? So I can track partitions from which file. In the hadoop, I can find this information through the code: FileSplit fileSplit = (FileSplit) context.getInputSplit(); String strFilename = fileSplit.getPath().getName(); But how can I do this in spark? Regards, Shuai
Any potentiail issue if I create a SparkContext in executor
Hi All, I notice if we create a spark context in driver, we need to call stop method to clear it. SparkConf sparkConf = new SparkConf().setAppName(FinancialEngineExecutor); JavaSparkContext ctx = new JavaSparkContext(sparkConf); . String inputPath = props[0].getProperty(Constants.S3_INPUT_FILES); JavaRDDString lines = ctx.textFile(inputPath); EngineFlatMapFunction engine = new EngineFlatMapFunction(); engine.setAnalysisConfiguraitons(props); lines.mapPartitionsToPair(engine); . ctx.stop(); And if I I have below code in the closure (EngineFlatMapFunction.java) Configuration hadoopConfiguration = new Configuration(new JavaSparkContext(new SparkConf()).hadoopConfiguration()); Any issue there? Because I need to have the Hadoop configuration in closure but the Configuration class itself is not serializable, so I retrieve it from the executor part. Will it have any issue if I create the Spark context in the above code without call stop on it? Regards, Shuai
RE: Control default partition when load a RDD from HDFS
Hmmm, how to do that? You mean for each file create a RDD? Then I will have tons of RDD. And my calculation need to rely on other input, not just the file itself Can you show some pseudo code for that logic? Regards, Shuai From: Diego García Valverde [mailto:dgarci...@agbar.es] Sent: Wednesday, December 17, 2014 11:04 AM To: Shuai Zheng; 'Sun, Rui'; user@spark.apache.org Subject: RE: Control default partition when load a RDD from HDFS Why not is a good option to create a RDD per each 200Mb file and then apply the pre-calculations before merging them? I think the partitions per RDD must be transparent to the pre-calculations, and not to set them fixed to optimize the spark maps/reduces processes. De: Shuai Zheng [mailto:szheng.c...@gmail.com] Enviado el: miércoles, 17 de diciembre de 2014 16:01 Para: 'Sun, Rui'; user@spark.apache.org Asunto: RE: Control default partition when load a RDD from HDFS Nice, that is the answer I want. Thanks! From: Sun, Rui [mailto:rui@intel.com] Sent: Wednesday, December 17, 2014 1:30 AM To: Shuai Zheng; user@spark.apache.org Subject: RE: Control default partition when load a RDD from HDFS Hi, Shuai, How did you turn off the file split in Hadoop? I guess you might have implemented a customized FileInputFormat which overrides isSplitable() to return FALSE. If you do have such FileInputFormat, you can simply pass it as a constructor parameter to HadoopRDD or NewHadoopRDD in Spark. From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, December 17, 2014 4:16 AM To: user@spark.apache.org Subject: Control default partition when load a RDD from HDFS Hi All, My application load 1000 files, each file from 200M a few GB, and combine with other data to do calculation. Some pre-calculation must be done on each file level, then after that, the result need to combine to do further calculation. In Hadoop, it is simple because I can turn-off the file split for input format (to enforce each file will go to same mapper), then I will do the file level calculation in mapper and pass result to reducer. But in spark, how can I do it? Basically I want to make sure after I load these files into RDD, it is partitioned by file (not split file and also no merge there), so I can call mapPartitions. Is it any way I can control the default partition when I load the RDD? This might be the default behavior that spark do the partition (partitioned by file when first time load the RDD), but I cant find any document to support my guess, if not, can I enforce this kind of partition? Because the total file size is bigger, I dont want to re-partition in the code. Regards, Shuai _ Disclaimer: http://disclaimer.agbar.com
RE: Control default partition when load a RDD from HDFS
Nice, that is the answer I want. Thanks! From: Sun, Rui [mailto:rui@intel.com] Sent: Wednesday, December 17, 2014 1:30 AM To: Shuai Zheng; user@spark.apache.org Subject: RE: Control default partition when load a RDD from HDFS Hi, Shuai, How did you turn off the file split in Hadoop? I guess you might have implemented a customized FileInputFormat which overrides isSplitable() to return FALSE. If you do have such FileInputFormat, you can simply pass it as a constructor parameter to HadoopRDD or NewHadoopRDD in Spark. From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, December 17, 2014 4:16 AM To: user@spark.apache.org Subject: Control default partition when load a RDD from HDFS Hi All, My application load 1000 files, each file from 200M - a few GB, and combine with other data to do calculation. Some pre-calculation must be done on each file level, then after that, the result need to combine to do further calculation. In Hadoop, it is simple because I can turn-off the file split for input format (to enforce each file will go to same mapper), then I will do the file level calculation in mapper and pass result to reducer. But in spark, how can I do it? Basically I want to make sure after I load these files into RDD, it is partitioned by file (not split file and also no merge there), so I can call mapPartitions. Is it any way I can control the default partition when I load the RDD? This might be the default behavior that spark do the partition (partitioned by file when first time load the RDD), but I can't find any document to support my guess, if not, can I enforce this kind of partition? Because the total file size is bigger, I don't want to re-partition in the code. Regards, Shuai
Control default partition when load a RDD from HDFS
Hi All, My application load 1000 files, each file from 200M - a few GB, and combine with other data to do calculation. Some pre-calculation must be done on each file level, then after that, the result need to combine to do further calculation. In Hadoop, it is simple because I can turn-off the file split for input format (to enforce each file will go to same mapper), then I will do the file level calculation in mapper and pass result to reducer. But in spark, how can I do it? Basically I want to make sure after I load these files into RDD, it is partitioned by file (not split file and also no merge there), so I can call mapPartitions. Is it any way I can control the default partition when I load the RDD? This might be the default behavior that spark do the partition (partitioned by file when first time load the RDD), but I can't find any document to support my guess, if not, can I enforce this kind of partition? Because the total file size is bigger, I don't want to re-partition in the code. Regards, Shuai
RE: Any Replicated RDD in Spark?
Matei, Thanks for reply. I don't worry that much about more code because I migrate from mapreduce, so I have existing code to handle it. But if I want to use a new tech, I will always prefer right way not a temporary easy way!. I will go with RDD first to test the performance. Thanks! Shuai -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Wednesday, November 05, 2014 6:27 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Any Replicated RDD in Spark? If you start with an RDD, you do have to collect to the driver and broadcast to do this. Between the two options you listed, I think this one is simpler to implement, and there won't be a huge difference in performance, so you can go for it. Opening InputStreams to a distributed file system by hand can be a lot of code. Matei On Nov 5, 2014, at 12:37 PM, Shuai Zheng szheng.c...@gmail.com wrote: And another similar case: If I have get a RDD from previous step, but for next step it should be a map side join (so I need to broadcast this RDD to every nodes). What is the best way for me to do that? Collect RDD in driver first and create broadcast? Or any shortcut in spark for this? Thanks! -Original Message- From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, November 05, 2014 3:32 PM To: 'Matei Zaharia' Cc: 'user@spark.apache.org' Subject: RE: Any Replicated RDD in Spark? Nice. Then I have another question, if I have a file (or a set of files: part-0, part-1, might be a few hundreds MB csv to 1-2 GB, created by other program), need to create hashtable from it, later broadcast it to each node to allow query (map side join). I have two options to do it: 1, I can just load the file in a general code (open a inputstream, etc), parse content and then create the broadcast from it. 2, I also can use a standard way to create the RDD from these file, run the map to parse it, then collect it as map, wrap the result as broadcast to push to all nodes again. I think the option 2 might be more consistent with spark's concept (and less code?)? But how about the performance? The gain is can parallel load and parse the data, penalty is after load we need to collect and broadcast result again? Please share your opinion. I am not sure what is the best practice here (in theory, either way works, but in real world, which one is better?). Regards, Shuai -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Monday, November 03, 2014 4:15 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Any Replicated RDD in Spark? You need to use broadcast followed by flatMap or mapPartitions to do map-side joins (in your map function, you can look at the hash table you broadcast and see what records match it). Spark SQL also does it by default for tables smaller than the spark.sql.autoBroadcastJoinThreshold setting (by default 10 KB, which is really small, but you can bump this up with set spark.sql.autoBroadcastJoinThreshold=100 for example). Matei On Nov 3, 2014, at 1:03 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I have spent last two years on hadoop but new to spark. I am planning to move one of my existing system to spark to get some enhanced features. My question is: If I try to do a map side join (something similar to Replicated key word in Pig), how can I do it? Is it anyway to declare a RDD as replicated (means distribute it to all nodes and each node will have a full copy)? I know I can use accumulator to get this feature, but I am not sure what is the best practice. And if I accumulator to broadcast the data set, can then (after broadcast) convert it into a RDD and do the join? Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Any Replicated RDD in Spark?
Nice. Then I have another question, if I have a file (or a set of files: part-0, part-1, might be a few hundreds MB csv to 1-2 GB, created by other program), need to create hashtable from it, later broadcast it to each node to allow query (map side join). I have two options to do it: 1, I can just load the file in a general code (open a inputstream, etc), parse content and then create the broadcast from it. 2, I also can use a standard way to create the RDD from these file, run the map to parse it, then collect it as map, wrap the result as broadcast to push to all nodes again. I think the option 2 might be more consistent with spark's concept (and less code?)? But how about the performance? The gain is can parallel load and parse the data, penalty is after load we need to collect and broadcast result again? Please share your opinion. I am not sure what is the best practice here (in theory, either way works, but in real world, which one is better?). Regards, Shuai -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Monday, November 03, 2014 4:15 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Any Replicated RDD in Spark? You need to use broadcast followed by flatMap or mapPartitions to do map-side joins (in your map function, you can look at the hash table you broadcast and see what records match it). Spark SQL also does it by default for tables smaller than the spark.sql.autoBroadcastJoinThreshold setting (by default 10 KB, which is really small, but you can bump this up with set spark.sql.autoBroadcastJoinThreshold=100 for example). Matei On Nov 3, 2014, at 1:03 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I have spent last two years on hadoop but new to spark. I am planning to move one of my existing system to spark to get some enhanced features. My question is: If I try to do a map side join (something similar to Replicated key word in Pig), how can I do it? Is it anyway to declare a RDD as replicated (means distribute it to all nodes and each node will have a full copy)? I know I can use accumulator to get this feature, but I am not sure what is the best practice. And if I accumulator to broadcast the data set, can then (after broadcast) convert it into a RDD and do the join? Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Any Replicated RDD in Spark?
And another similar case: If I have get a RDD from previous step, but for next step it should be a map side join (so I need to broadcast this RDD to every nodes). What is the best way for me to do that? Collect RDD in driver first and create broadcast? Or any shortcut in spark for this? Thanks! -Original Message- From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, November 05, 2014 3:32 PM To: 'Matei Zaharia' Cc: 'user@spark.apache.org' Subject: RE: Any Replicated RDD in Spark? Nice. Then I have another question, if I have a file (or a set of files: part-0, part-1, might be a few hundreds MB csv to 1-2 GB, created by other program), need to create hashtable from it, later broadcast it to each node to allow query (map side join). I have two options to do it: 1, I can just load the file in a general code (open a inputstream, etc), parse content and then create the broadcast from it. 2, I also can use a standard way to create the RDD from these file, run the map to parse it, then collect it as map, wrap the result as broadcast to push to all nodes again. I think the option 2 might be more consistent with spark's concept (and less code?)? But how about the performance? The gain is can parallel load and parse the data, penalty is after load we need to collect and broadcast result again? Please share your opinion. I am not sure what is the best practice here (in theory, either way works, but in real world, which one is better?). Regards, Shuai -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Monday, November 03, 2014 4:15 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Any Replicated RDD in Spark? You need to use broadcast followed by flatMap or mapPartitions to do map-side joins (in your map function, you can look at the hash table you broadcast and see what records match it). Spark SQL also does it by default for tables smaller than the spark.sql.autoBroadcastJoinThreshold setting (by default 10 KB, which is really small, but you can bump this up with set spark.sql.autoBroadcastJoinThreshold=100 for example). Matei On Nov 3, 2014, at 1:03 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I have spent last two years on hadoop but new to spark. I am planning to move one of my existing system to spark to get some enhanced features. My question is: If I try to do a map side join (something similar to Replicated key word in Pig), how can I do it? Is it anyway to declare a RDD as replicated (means distribute it to all nodes and each node will have a full copy)? I know I can use accumulator to get this feature, but I am not sure what is the best practice. And if I accumulator to broadcast the data set, can then (after broadcast) convert it into a RDD and do the join? Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Any Replicated RDD in Spark?
Hi All, I have spent last two years on hadoop but new to spark. I am planning to move one of my existing system to spark to get some enhanced features. My question is: If I try to do a map side join (something similar to Replicated key word in Pig), how can I do it? Is it anyway to declare a RDD as replicated (means distribute it to all nodes and each node will have a full copy)? I know I can use accumulator to get this feature, but I am not sure what is the best practice. And if I accumulator to broadcast the data set, can then (after broadcast) convert it into a RDD and do the join? Regards, Shuai
Memory limitation on EMR Node?
Hi, I am planning to run spark on EMR. And because my application might take a lot of memory. On EMR, I know there is a hard limit 16G physical memory on individual mapper/reducer (otherwise I will have an exception and this is confirmed by AWS EMR team, at least it is the spec at this moment). And if I use Yarn on EMR, and submit the spark job to YARN, I assume the yarn will take the responsibility to do the resource allocation, so the limitation on the physical memory still be 16G? Is it a reasonable guess or anyone has any experience to use more than 16G memory on the EMR for individual executor? And I notice that there are some examples that allocate more than 16G memory in the doc, so if I use spark cluster by itself, I can use more memory? Regards, Shuai